diff options
author | Christian Grothoff <christian@grothoff.org> | 2019-02-11 20:39:36 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2019-02-11 20:39:36 +0100 |
commit | 1f59e703d82b47f3aeaf432045a2633c2841169b (patch) | |
tree | 6af5609b388cf1906a29b5d572bec2dd8fb2ae1c /src/psycstore |
initial import from gnunet.git
Diffstat (limited to 'src/psycstore')
-rw-r--r-- | src/psycstore/.gitignore | 5 | ||||
-rw-r--r-- | src/psycstore/Makefile.am | 155 | ||||
-rw-r--r-- | src/psycstore/gnunet-service-psycstore.c | 1049 | ||||
-rw-r--r-- | src/psycstore/plugin_psycstore_mysql.c | 1960 | ||||
-rw-r--r-- | src/psycstore/plugin_psycstore_postgres.c | 1530 | ||||
-rw-r--r-- | src/psycstore/plugin_psycstore_sqlite.c | 1948 | ||||
-rw-r--r-- | src/psycstore/psycstore.conf.in | 28 | ||||
-rw-r--r-- | src/psycstore/psycstore.h | 520 | ||||
-rw-r--r-- | src/psycstore/psycstore_api.c | 1285 | ||||
-rw-r--r-- | src/psycstore/test_plugin_psycstore.c | 532 | ||||
-rw-r--r-- | src/psycstore/test_plugin_psycstore_mysql.conf | 7 | ||||
-rw-r--r-- | src/psycstore/test_plugin_psycstore_postgres.conf | 2 | ||||
-rw-r--r-- | src/psycstore/test_plugin_psycstore_sqlite.conf | 2 | ||||
-rw-r--r-- | src/psycstore/test_psycstore.c | 586 | ||||
-rw-r--r-- | src/psycstore/test_psycstore.conf | 8 |
15 files changed, 9617 insertions, 0 deletions
diff --git a/src/psycstore/.gitignore b/src/psycstore/.gitignore new file mode 100644 index 0000000..5ec7832 --- /dev/null +++ b/src/psycstore/.gitignore @@ -0,0 +1,5 @@ +gnunet-service-psycstore +test_plugin_psycstore_mysql +test_plugin_psycstore_sqlite +test_plugin_psycstore_postgres +test_psycstore diff --git a/src/psycstore/Makefile.am b/src/psycstore/Makefile.am new file mode 100644 index 0000000..557bb42 --- /dev/null +++ b/src/psycstore/Makefile.am @@ -0,0 +1,155 @@ +# This Makefile.am is in the public domain +AM_CPPFLAGS = -I$(top_srcdir)/src/include + +plugindir = $(libdir)/gnunet + +pkgcfgdir= $(pkgdatadir)/config.d/ + +libexecdir= $(pkglibdir)/libexec/ + +pkgcfg_DATA = \ + psycstore.conf + + +if MINGW + WINFLAGS = -Wl,--no-undefined -Wl,--export-all-symbols +endif + +if USE_COVERAGE + AM_CFLAGS = --coverage -O0 + XLIB = -lgcov +endif + +if HAVE_MYSQL +MYSQL_PLUGIN = libgnunet_plugin_psycstore_mysql.la +if HAVE_TESTING +MYSQL_TESTS = test_plugin_psycstore_mysql +endif +endif + +if HAVE_POSTGRESQL +POSTGRES_PLUGIN = libgnunet_plugin_psycstore_postgres.la +if HAVE_TESTING +POSTGRES_TESTS = test_plugin_psycstore_postgres +endif +endif + +if HAVE_SQLITE +SQLITE_PLUGIN = libgnunet_plugin_psycstore_sqlite.la +if HAVE_TESTING +SQLITE_TESTS = test_plugin_psycstore_sqlite +endif +endif + +lib_LTLIBRARIES = libgnunetpsycstore.la + +libgnunetpsycstore_la_SOURCES = \ + psycstore_api.c \ + psycstore.h +libgnunetpsycstore_la_LIBADD = \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(GN_LIBINTL) $(XLIB) +libgnunetpsycstore_la_LDFLAGS = \ + $(GN_LIB_LDFLAGS) $(WINFLAGS) \ + -version-info 0:0:0 + +bin_PROGRAMS = + +libexec_PROGRAMS = \ + gnunet-service-psycstore + +gnunet_service_psycstore_SOURCES = \ + gnunet-service-psycstore.c +gnunet_service_psycstore_LDADD = \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/util/libgnunetutil.la \ + $(top_builddir)/src/psycutil/libgnunetpsycutil.la \ + $(GN_LIBINTL) + +plugin_LTLIBRARIES = \ + $(SQLITE_PLUGIN) \ + $(MYSQL_PLUGIN) \ + $(POSTGRES_PLUGIN) + + +libgnunet_plugin_psycstore_mysql_la_SOURCES = \ + plugin_psycstore_mysql.c +libgnunet_plugin_psycstore_mysql_la_LIBADD = \ + libgnunetpsycstore.la \ + $(top_builddir)/src/my/libgnunetmy.la \ + $(top_builddir)/src/mysql/libgnunetmysql.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/util/libgnunetutil.la $(XLIBS) \ + $(LTLIBINTL) +libgnunet_plugin_psycstore_mysql_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) + +libgnunet_plugin_psycstore_postgres_la_SOURCES = \ + plugin_psycstore_postgres.c +libgnunet_plugin_psycstore_postgres_la_LIBADD = \ + libgnunetpsycstore.la \ + $(top_builddir)/src/pq/libgnunetpq.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/util/libgnunetutil.la $(XLIBS) -lpq \ + $(LTLIBINTL) +libgnunet_plugin_psycstore_postgres_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) $(POSTGRESQL_LDFLAGS) +libgnunet_plugin_psycstore_postgres_la_CPPFLAGS = \ + $(POSTGRESQL_CPPFLAGS) $(AM_CPPFLAGS) + + +libgnunet_plugin_psycstore_sqlite_la_SOURCES = \ + plugin_psycstore_sqlite.c +libgnunet_plugin_psycstore_sqlite_la_LIBADD = \ + libgnunetpsycstore.la \ + $(top_builddir)/src/statistics/libgnunetstatistics.la \ + $(top_builddir)/src/util/libgnunetutil.la $(XLIBS) -lsqlite3 \ + $(LTLIBINTL) +libgnunet_plugin_psycstore_sqlite_la_LDFLAGS = \ + $(GN_PLUGIN_LDFLAGS) + + +if HAVE_SQLITE +if HAVE_TESTING +check_PROGRAMS = \ + $(SQLITE_TESTS) \ + $(MYSQL_TESTS) \ + $(POSTGRES_TESTS) \ + test_psycstore +endif +endif + +if ENABLE_TEST_RUN +AM_TESTS_ENVIRONMENT=export GNUNET_PREFIX=$${GNUNET_PREFIX:-@libdir@};export PATH=$${GNUNET_PREFIX:-@prefix@}/bin:$$PATH;unset XDG_DATA_HOME;unset XDG_CONFIG_HOME; +TESTS = $(check_PROGRAMS) +endif + +test_psycstore_SOURCES = \ + test_psycstore.c +test_psycstore_LDADD = \ + libgnunetpsycstore.la \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(top_builddir)/src/util/libgnunetutil.la + +EXTRA_DIST = \ + test_psycstore.conf + + +test_plugin_psycstore_sqlite_SOURCES = \ + test_plugin_psycstore.c +test_plugin_psycstore_sqlite_LDADD = \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(top_builddir)/src/util/libgnunetutil.la + +test_plugin_psycstore_mysql_SOURCES = \ + test_plugin_psycstore.c +test_plugin_psycstore_mysql_LDADD = \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(top_builddir)/src/util/libgnunetutil.la + +test_plugin_psycstore_postgres_SOURCES = \ + test_plugin_psycstore.c +test_plugin_psycstore_postgres_LDADD = \ + $(top_builddir)/src/testing/libgnunettesting.la \ + $(top_builddir)/src/util/libgnunetutil.la + diff --git a/src/psycstore/gnunet-service-psycstore.c b/src/psycstore/gnunet-service-psycstore.c new file mode 100644 index 0000000..9aebd3e --- /dev/null +++ b/src/psycstore/gnunet-service-psycstore.c @@ -0,0 +1,1049 @@ +/** + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file psycstore/gnunet-service-psycstore.c + * @brief PSYCstore service + * @author Gabor X Toth + * @author Christian Grothoff + */ + +#include <inttypes.h> + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_constants.h" +#include "gnunet_protocols.h" +#include "gnunet_statistics_service.h" +#include "gnunet_psyc_util_lib.h" +#include "gnunet_psycstore_service.h" +#include "gnunet_psycstore_plugin.h" +#include "psycstore.h" + + +/** + * Handle to our current configuration. + */ +static const struct GNUNET_CONFIGURATION_Handle *cfg; + +/** + * Service handle. + */ +static struct GNUNET_SERVICE_Handle *service; + +/** + * Handle to the statistics service. + */ +static struct GNUNET_STATISTICS_Handle *stats; + +/** + * Database handle + */ +static struct GNUNET_PSYCSTORE_PluginFunctions *db; + +/** + * Name of the database plugin + */ +static char *db_lib_name; + + +/** + * Task run during shutdown. + * + * @param cls unused + */ +static void +shutdown_task (void *cls) +{ + if (NULL != stats) + { + GNUNET_STATISTICS_destroy (stats, GNUNET_NO); + stats = NULL; + } + GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, db)); + GNUNET_free (db_lib_name); + db_lib_name = NULL; +} + + +/** + * Send a result code back to the client. + * + * @param client + * Client that should receive the result code. + * @param result_code + * Code to transmit. + * @param op_id + * Operation ID in network byte order. + * @param err_msg + * Error message to include (or NULL for none). + */ +static void +send_result_code (struct GNUNET_SERVICE_Client *client, + uint64_t op_id, + int64_t result_code, + const char *err_msg) +{ + struct OperationResult *res; + size_t err_size = 0; + + if (NULL != err_msg) + err_size = strnlen (err_msg, + GNUNET_MAX_MESSAGE_SIZE - sizeof (*res) - 1) + 1; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (res, err_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE); + res->result_code = GNUNET_htonll (result_code - INT64_MIN); + res->op_id = op_id; + if (0 < err_size) + { + GNUNET_memcpy (&res[1], err_msg, err_size); + ((char *) &res[1])[err_size - 1] = '\0'; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending result to client: %" PRId64 " (%s)\n", + result_code, err_msg); + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); +} + + +enum +{ + MEMBERSHIP_TEST_NOT_NEEDED = 0, + MEMBERSHIP_TEST_NEEDED = 1, + MEMBERSHIP_TEST_DONE = 2, +} MessageMembershipTest; + + +struct SendClosure +{ + struct GNUNET_SERVICE_Client *client; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /** + * Slave's public key. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + + /** + * Operation ID. + */ + uint64_t op_id; + + /** + * Membership test result. + */ + int membership_test_result; + + /** + * Do membership test with @a slave_key before returning fragment? + * @see enum MessageMembershipTest + */ + uint8_t membership_test; +}; + + +static int +send_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct SendClosure *sc = cls; + struct FragmentResult *res; + + if (MEMBERSHIP_TEST_NEEDED == sc->membership_test) + { + sc->membership_test = MEMBERSHIP_TEST_DONE; + sc->membership_test_result + = db->membership_test (db->cls, &sc->channel_key, &sc->slave_key, + GNUNET_ntohll (msg->message_id)); + switch (sc->membership_test_result) + { + case GNUNET_YES: + break; + + case GNUNET_NO: + case GNUNET_SYSERR: + return GNUNET_NO; + } + } + + size_t msg_size = ntohs (msg->header.size); + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (res, msg_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT); + res->op_id = sc->op_id; + res->psycstore_flags = htonl (flags); + GNUNET_memcpy (&res[1], msg, msg_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending fragment %llu to client\n", + (unsigned long long) GNUNET_ntohll (msg->fragment_id)); + GNUNET_free (msg); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (sc->client), env); + return GNUNET_YES; +} + + +static int +send_state_var (void *cls, const char *name, + const void *value, uint32_t value_size) +{ + struct SendClosure *sc = cls; + struct StateResult *res; + size_t name_size = strlen (name) + 1; + + /** @todo FIXME: split up value into 64k chunks */ + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (res, name_size + value_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE); + res->op_id = sc->op_id; + res->name_size = htons (name_size); + GNUNET_memcpy (&res[1], name, name_size); + GNUNET_memcpy ((char *) &res[1] + name_size, value, value_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Sending state variable %s to client\n", name); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (sc->client), env); + return GNUNET_OK; +} + + +static void +handle_client_membership_store (void *cls, + const struct MembershipStoreRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + int ret = db->membership_store (db->cls, &req->channel_key, &req->slave_key, + req->did_join, + GNUNET_ntohll (req->announced_at), + GNUNET_ntohll (req->effective_since), + GNUNET_ntohll (req->group_generation)); + + if (ret != GNUNET_OK) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to store membership information!\n")); + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static void +handle_client_membership_test (void *cls, + const struct MembershipTestRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + int ret = db->membership_test (db->cls, &req->channel_key, &req->slave_key, + GNUNET_ntohll (req->message_id)); + switch (ret) + { + case GNUNET_YES: + case GNUNET_NO: + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to test membership!\n")); + } + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_fragment_store (void *cls, + const struct FragmentStoreRequest *req) +{ + return GNUNET_OK; +} + + +static void +handle_client_fragment_store (void *cls, + const struct FragmentStoreRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + const struct GNUNET_MessageHeader * + msg = GNUNET_MQ_extract_nested_mh (req); + if (NULL == msg + || ntohs (msg->size) < sizeof (struct GNUNET_MULTICAST_MessageHeader)) + { + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Dropping invalid fragment\n")); + GNUNET_SERVICE_client_drop (client); + return; + } + + int ret = db->fragment_store (db->cls, &req->channel_key, + (const struct GNUNET_MULTICAST_MessageHeader *) + msg, ntohl (req->psycstore_flags)); + + if (ret != GNUNET_OK) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to store fragment\n")); + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static void +handle_client_fragment_get (void *cls, + const struct FragmentGetRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + struct SendClosure + sc = { .op_id = req->op_id, + .client = client, + .channel_key = req->channel_key, + .slave_key = req->slave_key, + .membership_test = req->do_membership_test }; + + int64_t ret; + uint64_t ret_frags = 0; + uint64_t first_fragment_id = GNUNET_ntohll (req->first_fragment_id); + uint64_t last_fragment_id = GNUNET_ntohll (req->last_fragment_id); + uint64_t limit = GNUNET_ntohll (req->fragment_limit); + + if (0 == limit) + ret = db->fragment_get (db->cls, &req->channel_key, + first_fragment_id, last_fragment_id, + &ret_frags, send_fragment, &sc); + else + ret = db->fragment_get_latest (db->cls, &req->channel_key, limit, + &ret_frags, send_fragment, &sc); + + switch (ret) + { + case GNUNET_YES: + case GNUNET_NO: + if (MEMBERSHIP_TEST_DONE == sc.membership_test) + { + switch (sc.membership_test_result) + { + case GNUNET_YES: + break; + + case GNUNET_NO: + ret = GNUNET_PSYCSTORE_MEMBERSHIP_TEST_FAILED; + break; + + case GNUNET_SYSERR: + ret = GNUNET_SYSERR; + break; + } + } + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to get fragment!\n")); + } + send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_message_get (void *cls, + const struct MessageGetRequest *req) +{ + return GNUNET_OK; +} + + +static void +handle_client_message_get (void *cls, + const struct MessageGetRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + uint16_t size = ntohs (req->header.size); + const char *method_prefix = (const char *) &req[1]; + + if (size < sizeof (*req) + 1 + || '\0' != method_prefix[size - sizeof (*req) - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Message get: invalid method prefix. size: %u < %u?\n", + size, + (unsigned int) (sizeof (*req) + 1)); + GNUNET_break (0); + GNUNET_SERVICE_client_drop (client); + return; + } + + struct SendClosure + sc = { .op_id = req->op_id, + .client = client, + .channel_key = req->channel_key, + .slave_key = req->slave_key, + .membership_test = req->do_membership_test }; + + int64_t ret; + uint64_t ret_frags = 0; + uint64_t first_message_id = GNUNET_ntohll (req->first_message_id); + uint64_t last_message_id = GNUNET_ntohll (req->last_message_id); + uint64_t msg_limit = GNUNET_ntohll (req->message_limit); + uint64_t frag_limit = GNUNET_ntohll (req->fragment_limit); + + /** @todo method_prefix */ + if (0 == msg_limit) + ret = db->message_get (db->cls, &req->channel_key, + first_message_id, last_message_id, frag_limit, + &ret_frags, send_fragment, &sc); + else + ret = db->message_get_latest (db->cls, &req->channel_key, msg_limit, + &ret_frags, send_fragment, &sc); + + switch (ret) + { + case GNUNET_YES: + case GNUNET_NO: + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to get message!\n")); + } + + send_result_code (client, req->op_id, (ret < 0) ? ret : ret_frags, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static void +handle_client_message_get_fragment (void *cls, + const struct MessageGetFragmentRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + struct SendClosure + sc = { .op_id = req->op_id, .client = client, + .channel_key = req->channel_key, .slave_key = req->slave_key, + .membership_test = req->do_membership_test }; + + int ret = db->message_get_fragment (db->cls, &req->channel_key, + GNUNET_ntohll (req->message_id), + GNUNET_ntohll (req->fragment_offset), + &send_fragment, &sc); + switch (ret) + { + case GNUNET_YES: + case GNUNET_NO: + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to get message fragment!\n")); + } + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static void +handle_client_counters_get (void *cls, + const struct OperationRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + struct CountersResult *res; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (res, GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS); + + int ret = db->counters_message_get (db->cls, &req->channel_key, + &res->max_fragment_id, &res->max_message_id, + &res->max_group_generation); + switch (ret) + { + case GNUNET_OK: + ret = db->counters_state_get (db->cls, &req->channel_key, + &res->max_state_message_id); + case GNUNET_NO: + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to get master counters!\n")); + } + + res->result_code = htonl (ret); + res->op_id = req->op_id; + res->max_fragment_id = GNUNET_htonll (res->max_fragment_id); + res->max_message_id = GNUNET_htonll (res->max_message_id); + res->max_group_generation = GNUNET_htonll (res->max_group_generation); + res->max_state_message_id = GNUNET_htonll (res->max_state_message_id); + + GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); + GNUNET_SERVICE_client_continue (client); +} + + +struct StateModifyClosure +{ + const struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + struct GNUNET_PSYC_ReceiveHandle *recv; + enum GNUNET_PSYC_MessageState msg_state; + char mod_oper; + char *mod_name; + char *mod_value; + uint32_t mod_value_size; + uint32_t mod_value_remaining; +}; + + +static void +recv_state_message_part (void *cls, + const struct GNUNET_PSYC_MessageHeader *msg, + const struct GNUNET_MessageHeader *pmsg) +{ + struct StateModifyClosure *scls = cls; + uint16_t psize; + + if (NULL == msg) + { // FIXME: error on unknown message + return; + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "recv_state_message_part() message_id: %" PRIu64 + ", fragment_offset: %" PRIu64 ", flags: %u\n", + GNUNET_ntohll (msg->message_id), + GNUNET_ntohll (msg->fragment_offset), + ntohl (msg->flags)); + + if (NULL == pmsg) + { + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; + return; + } + + switch (ntohs (pmsg->type)) + { + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_METHOD: + { + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_METHOD; + break; + } + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MODIFIER: + { + struct GNUNET_PSYC_MessageModifier * + pmod = (struct GNUNET_PSYC_MessageModifier *) pmsg; + psize = ntohs (pmod->header.size); + uint16_t name_size = ntohs (pmod->name_size); + uint32_t value_size = ntohl (pmod->value_size); + + const char *name = (const char *) &pmod[1]; + const void *value = name + name_size; + + if (GNUNET_PSYC_OP_SET != pmod->oper) + { // Apply non-transient operation. + if (psize == sizeof (*pmod) + name_size + value_size) + { + db->state_modify_op (db->cls, &scls->channel_key, + pmod->oper, name, value, value_size); + } + else + { + scls->mod_oper = pmod->oper; + scls->mod_name = GNUNET_malloc (name_size); + GNUNET_memcpy (scls->mod_name, name, name_size); + + scls->mod_value_size = value_size; + scls->mod_value = GNUNET_malloc (scls->mod_value_size); + scls->mod_value_remaining + = scls->mod_value_size - (psize - sizeof (*pmod) - name_size); + GNUNET_memcpy (scls->mod_value, value, value_size - scls->mod_value_remaining); + } + } + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MODIFIER; + break; + } + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_MOD_CONT: + if (GNUNET_PSYC_OP_SET != scls->mod_oper) + { + if (scls->mod_value_remaining == 0) + { + GNUNET_break_op (0); + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; + } + psize = ntohs (pmsg->size); + GNUNET_memcpy (scls->mod_value + (scls->mod_value_size - scls->mod_value_remaining), + &pmsg[1], psize - sizeof (*pmsg)); + scls->mod_value_remaining -= psize - sizeof (*pmsg); + if (0 == scls->mod_value_remaining) + { + db->state_modify_op (db->cls, &scls->channel_key, + scls->mod_oper, scls->mod_name, + scls->mod_value, scls->mod_value_size); + GNUNET_free (scls->mod_name); + GNUNET_free (scls->mod_value); + scls->mod_oper = 0; + scls->mod_name = NULL; + scls->mod_value = NULL; + scls->mod_value_size = 0; + } + } + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_MOD_CONT; + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_DATA: + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_DATA; + break; + + case GNUNET_MESSAGE_TYPE_PSYC_MESSAGE_END: + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_END; + break; + + default: + scls->msg_state = GNUNET_PSYC_MESSAGE_STATE_ERROR; + } +} + + +static int +recv_state_fragment (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct StateModifyClosure *scls = cls; + + if (NULL == scls->recv) + { + scls->recv = GNUNET_PSYC_receive_create (NULL, recv_state_message_part, + scls); + } + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "recv_state_fragment: %" PRIu64 "\n", GNUNET_ntohll (msg->fragment_id)); + + struct GNUNET_PSYC_MessageHeader * + pmsg = GNUNET_PSYC_message_header_create (msg, flags); + GNUNET_PSYC_receive_message (scls->recv, pmsg); + GNUNET_free (pmsg); + + return GNUNET_YES; +} + + +static void +handle_client_state_modify (void *cls, + const struct StateModifyRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + uint64_t message_id = GNUNET_ntohll (req->message_id); + uint64_t state_delta = GNUNET_ntohll (req->state_delta); + uint64_t ret_frags = 0; + struct StateModifyClosure + scls = { .channel_key = req->channel_key }; + + int ret = db->state_modify_begin (db->cls, &req->channel_key, + message_id, state_delta); + + if (GNUNET_OK != ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + _("Failed to begin modifying state: %d\n"), ret); + } + else + { + ret = db->message_get (db->cls, &req->channel_key, + message_id, message_id, 0, + &ret_frags, recv_state_fragment, &scls); + if (GNUNET_OK != ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to modify state: %d\n"), ret); + GNUNET_break (0); + } + else + { + if (GNUNET_OK != db->state_modify_end (db->cls, &req->channel_key, message_id)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to end modifying state!\n")); + GNUNET_break (0); + } + } + if (NULL != scls.recv) + { + GNUNET_PSYC_receive_destroy (scls.recv); + } + } + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_state_sync (void *cls, + const struct StateSyncRequest *req) +{ + return GNUNET_OK; +} + + +/** @todo FIXME: stop processing further state sync messages after an error */ +static void +handle_client_state_sync (void *cls, + const struct StateSyncRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + int ret = GNUNET_SYSERR; + const char *name = (const char *) &req[1]; + uint16_t name_size = ntohs (req->name_size); + + if (name_size <= 2 || '\0' != name[name_size - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Tried to set invalid state variable name!\n")); + GNUNET_break_op (0); + } + else + { + ret = GNUNET_OK; + + if (req->flags & STATE_OP_FIRST) + { + ret = db->state_sync_begin (db->cls, &req->channel_key); + } + if (ret != GNUNET_OK) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to begin synchronizing state!\n")); + } + else + { + ret = db->state_sync_assign (db->cls, &req->channel_key, name, + name + ntohs (req->name_size), + ntohs (req->header.size) - sizeof (*req) + - ntohs (req->name_size)); + } + + if (GNUNET_OK == ret && req->flags & STATE_OP_LAST) + { + ret = db->state_sync_end (db->cls, &req->channel_key, + GNUNET_ntohll (req->max_state_message_id), + GNUNET_ntohll (req->state_hash_message_id)); + if (ret != GNUNET_OK) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to end synchronizing state!\n")); + } + } + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static void +handle_client_state_reset (void *cls, + const struct OperationRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + int ret = db->state_reset (db->cls, &req->channel_key); + + if (ret != GNUNET_OK) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to reset state!\n")); + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static void +handle_client_state_hash_update (void *cls, + const struct StateHashUpdateRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + int ret = db->state_reset (db->cls, &req->channel_key); + if (ret != GNUNET_OK) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to reset state!\n")); + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_state_get (void *cls, + const struct OperationRequest *req) +{ + return GNUNET_OK; +} + + +static void +handle_client_state_get (void *cls, + const struct OperationRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + struct SendClosure sc = { .op_id = req->op_id, .client = client }; + int64_t ret = GNUNET_SYSERR; + const char *name = (const char *) &req[1]; + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + + if (name_size <= 2 || '\0' != name[name_size - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Tried to get invalid state variable name!\n")); + GNUNET_break (0); + } + else + { + ret = db->state_get (db->cls, &req->channel_key, name, + &send_state_var, &sc); + if (GNUNET_NO == ret && name_size >= 5) /* min: _a_b\0 */ + { + char *p, *n = GNUNET_malloc (name_size); + GNUNET_memcpy (n, name, name_size); + while (&n[1] < (p = strrchr (n, '_')) && GNUNET_NO == ret) + { + *p = '\0'; + ret = db->state_get (db->cls, &req->channel_key, n, + &send_state_var, &sc); + } + GNUNET_free (n); + } + } + switch (ret) + { + case GNUNET_OK: + case GNUNET_NO: + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to get state variable!\n")); + } + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +static int +check_client_state_get_prefix (void *cls, + const struct OperationRequest *req) +{ + return GNUNET_OK; +} + + +static void +handle_client_state_get_prefix (void *cls, + const struct OperationRequest *req) +{ + struct GNUNET_SERVICE_Client *client = cls; + + struct SendClosure sc = { .op_id = req->op_id, .client = client }; + int64_t ret = GNUNET_SYSERR; + const char *name = (const char *) &req[1]; + uint16_t name_size = ntohs (req->header.size) - sizeof (*req); + + if (name_size <= 1 || '\0' != name[name_size - 1]) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Tried to get invalid state variable name!\n")); + GNUNET_break (0); + } + else + { + ret = db->state_get_prefix (db->cls, &req->channel_key, name, + &send_state_var, &sc); + } + switch (ret) + { + case GNUNET_OK: + case GNUNET_NO: + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + _("Failed to get state variable!\n")); + } + + send_result_code (client, req->op_id, ret, NULL); + GNUNET_SERVICE_client_continue (client); +} + + +/** + * A new client connected. + * + * @param cls NULL + * @param client client to add + * @param mq message queue for @a client + * @return @a client + */ +static void * +client_notify_connect (void *cls, + struct GNUNET_SERVICE_Client *client, + struct GNUNET_MQ_Handle *mq) +{ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client connected: %p\n", client); + + return client; +} + + +/** + * Called whenever a client is disconnected. + * Frees our resources associated with that client. + * + * @param cls closure + * @param client identification of the client + * @param app_ctx must match @a client + */ +static void +client_notify_disconnect (void *cls, + struct GNUNET_SERVICE_Client *client, + void *app_ctx) +{ +} + + +/** + * Initialize the PSYCstore service. + * + * @param cls Closure. + * @param server The initialized server. + * @param c Configuration to use. + */ +static void +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *c, + struct GNUNET_SERVICE_Handle *svc) +{ + cfg = c; + service = svc; + + /* Loading database plugin */ + char *database; + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_string (cfg, "psycstore", "database", + &database)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "psycstore", + "database"); + } + else + { + GNUNET_asprintf (&db_lib_name, + "libgnunet_plugin_psycstore_%s", + database); + db = GNUNET_PLUGIN_load (db_lib_name, (void *) cfg); + GNUNET_free (database); + } + if (NULL == db) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not load database backend `%s'\n", + db_lib_name); + GNUNET_SCHEDULER_add_now (&shutdown_task, NULL); + return; + } + + stats = GNUNET_STATISTICS_create ("psycstore", cfg); + GNUNET_SCHEDULER_add_shutdown (shutdown_task, + NULL); +} + +/** + * Define "main" method using service macro. + */ +GNUNET_SERVICE_MAIN +("psycstore", + GNUNET_SERVICE_OPTION_NONE, + run, + client_notify_connect, + client_notify_disconnect, + NULL, + GNUNET_MQ_hd_fixed_size (client_membership_store, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE, + struct MembershipStoreRequest, + NULL), + GNUNET_MQ_hd_fixed_size (client_membership_test, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST, + struct MembershipTestRequest, + NULL), + GNUNET_MQ_hd_var_size (client_fragment_store, + GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE, + struct FragmentStoreRequest, + NULL), + GNUNET_MQ_hd_fixed_size (client_fragment_get, + GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET, + struct FragmentGetRequest, + NULL), + GNUNET_MQ_hd_var_size (client_message_get, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET, + struct MessageGetRequest, + NULL), + GNUNET_MQ_hd_fixed_size (client_message_get_fragment, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT, + struct MessageGetFragmentRequest, + NULL), + GNUNET_MQ_hd_fixed_size (client_counters_get, + GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET, + struct OperationRequest, + NULL), + GNUNET_MQ_hd_fixed_size (client_state_modify, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY, + struct StateModifyRequest, + NULL), + GNUNET_MQ_hd_var_size (client_state_sync, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC, + struct StateSyncRequest, + NULL), + GNUNET_MQ_hd_fixed_size (client_state_reset, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET, + struct OperationRequest, + NULL), + GNUNET_MQ_hd_fixed_size (client_state_hash_update, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE, + struct StateHashUpdateRequest, + NULL), + GNUNET_MQ_hd_var_size (client_state_get, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET, + struct OperationRequest, + NULL), + GNUNET_MQ_hd_var_size (client_state_get_prefix, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX, + struct OperationRequest, + NULL)); + +/* end of gnunet-service-psycstore.c */ diff --git a/src/psycstore/plugin_psycstore_mysql.c b/src/psycstore/plugin_psycstore_mysql.c new file mode 100644 index 0000000..c36b6f7 --- /dev/null +++ b/src/psycstore/plugin_psycstore_mysql.c @@ -0,0 +1,1960 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file psycstore/plugin_psycstore_mysql.c + * @brief mysql-based psycstore backend + * @author Gabor X Toth + * @author Christian Grothoff + * @author Christophe Genevey + */ + +#include "platform.h" +#include "gnunet_psycstore_plugin.h" +#include "gnunet_psycstore_service.h" +#include "gnunet_multicast_service.h" +#include "gnunet_crypto_lib.h" +#include "gnunet_psyc_util_lib.h" +#include "psycstore.h" +#include "gnunet_my_lib.h" +#include "gnunet_mysql_lib.h" +#include <mysql/mysql.h> + +/** + * After how many ms "busy" should a DB operation fail for good? A + * low value makes sure that we are more responsive to requests + * (especially PUTs). A high value guarantees a higher success rate + * (SELECTs in iterate can take several seconds despite LIMIT=1). + * + * The default value of 1s should ensure that users do not experience + * huge latencies while at the same time allowing operations to + * succeed with reasonable probability. + */ +#define BUSY_TIMEOUT_MS 1000 + +#define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING + +/** + * Log an error message at log-level 'level' that indicates + * a failure of the command 'cmd' on file 'filename' + * with the message given by strerror(errno). + */ +#define LOG_MYSQL(db, level, cmd, stmt) \ + do { \ + GNUNET_log_from (level, "psycstore-mysql", \ + _("`%s' failed at %s:%d with error: %s\n"), \ + cmd, __FILE__, __LINE__, \ + mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt(stmt))); \ + } while (0) + +#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-mysql", __VA_ARGS__) + +enum Transactions { + TRANSACTION_NONE = 0, + TRANSACTION_STATE_MODIFY, + TRANSACTION_STATE_SYNC, +}; + +/** + * Context for all functions in this plugin. + */ +struct Plugin +{ + + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * MySQL context. + */ + struct GNUNET_MYSQL_Context *mc; + + /** + * Current transaction. + */ + enum Transactions transaction; + + /** + * Precompiled SQL for channel_key_store() + */ + struct GNUNET_MYSQL_StatementHandle *insert_channel_key; + + /** + * Precompiled SQL for slave_key_store() + */ + struct GNUNET_MYSQL_StatementHandle *insert_slave_key; + + /** + * Precompiled SQL for membership_store() + */ + struct GNUNET_MYSQL_StatementHandle *insert_membership; + + /** + * Precompiled SQL for membership_test() + */ + struct GNUNET_MYSQL_StatementHandle *select_membership; + + /** + * Precompiled SQL for fragment_store() + */ + struct GNUNET_MYSQL_StatementHandle *insert_fragment; + + /** + * Precompiled SQL for message_add_flags() + */ + struct GNUNET_MYSQL_StatementHandle *update_message_flags; + + /** + * Precompiled SQL for fragment_get() + */ + struct GNUNET_MYSQL_StatementHandle *select_fragments; + + /** + * Precompiled SQL for fragment_get() + */ + struct GNUNET_MYSQL_StatementHandle *select_latest_fragments; + + /** + * Precompiled SQL for message_get() + */ + struct GNUNET_MYSQL_StatementHandle *select_messages; + + /** + * Precompiled SQL for message_get() + */ + struct GNUNET_MYSQL_StatementHandle *select_latest_messages; + + /** + * Precompiled SQL for message_get_fragment() + */ + struct GNUNET_MYSQL_StatementHandle *select_message_fragment; + + /** + * Precompiled SQL for counters_get_message() + */ + struct GNUNET_MYSQL_StatementHandle *select_counters_message; + + /** + * Precompiled SQL for counters_get_state() + */ + struct GNUNET_MYSQL_StatementHandle *select_counters_state; + + /** + * Precompiled SQL for state_modify_end() + */ + struct GNUNET_MYSQL_StatementHandle *update_state_hash_message_id; + + /** + * Precompiled SQL for state_sync_end() + */ + struct GNUNET_MYSQL_StatementHandle *update_max_state_message_id; + + /** + * Precompiled SQL for state_modify_op() + */ + struct GNUNET_MYSQL_StatementHandle *insert_state_current; + + /** + * Precompiled SQL for state_modify_end() + */ + struct GNUNET_MYSQL_StatementHandle *delete_state_empty; + + /** + * Precompiled SQL for state_set_signed() + */ + struct GNUNET_MYSQL_StatementHandle *update_state_signed; + + /** + * Precompiled SQL for state_sync() + */ + struct GNUNET_MYSQL_StatementHandle *insert_state_sync; + + /** + * Precompiled SQL for state_sync() + */ + struct GNUNET_MYSQL_StatementHandle *delete_state; + + /** + * Precompiled SQL for state_sync() + */ + struct GNUNET_MYSQL_StatementHandle *insert_state_from_sync; + + /** + * Precompiled SQL for state_sync() + */ + struct GNUNET_MYSQL_StatementHandle *delete_state_sync; + + /** + * Precompiled SQL for state_get_signed() + */ + struct GNUNET_MYSQL_StatementHandle *select_state_signed; + + /** + * Precompiled SQL for state_get() + */ + struct GNUNET_MYSQL_StatementHandle *select_state_one; + + /** + * Precompiled SQL for state_get_prefix() + */ + struct GNUNET_MYSQL_StatementHandle *select_state_prefix; + +}; + +#if DEBUG_PSYCSTORE + +static void +mysql_trace (void *cls, const char *sql) +{ + LOG(GNUNET_ERROR_TYPE_DEBUG, "MYSQL query:\n%s\n", sql); +} + +#endif + + +/** + * @brief Prepare a SQL statement + * + * @param dbh handle to the database + * @param sql SQL statement, UTF-8 encoded + * @param stmt set to the prepared statement + * @return #GNUNET_OK on success, #GNUNET_SYSERR on failure + */ +static int +mysql_prepare (struct GNUNET_MYSQL_Context *mc, + const char *sql, + struct GNUNET_MYSQL_StatementHandle **stmt) +{ + *stmt = GNUNET_MYSQL_statement_prepare (mc, + sql); + + if (NULL == *stmt) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Error preparing SQL query: %s\n %s\n"), + mysql_stmt_error (GNUNET_MYSQL_statement_get_stmt (*stmt)), + sql); + return GNUNET_SYSERR; + } + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Prepared `%s' / %p\n", + sql, + stmt); + return GNUNET_OK; +} + + +/** + * Initialize the database connections and associated + * data structures (create tables and indices + * as needed as well). + * + * @param plugin the plugin context (state for this module) + * @return #GNUNET_OK on success + */ +static int +database_setup (struct Plugin *plugin) +{ + /* Open database and precompile statements */ + plugin->mc = GNUNET_MYSQL_context_create (plugin->cfg, + "psycstore-mysql"); + + if (NULL == plugin->mc) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Unable to initialize Mysql.\n")); + return GNUNET_SYSERR; + } + +#define STMT_RUN(sql) \ + if (GNUNET_OK != \ + GNUNET_MYSQL_statement_run (plugin->mc, \ + sql)) \ + { \ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \ + _("Failed to run SQL statement `%s'\n"), \ + sql); \ + return GNUNET_SYSERR; \ + } + + /* Create tables */ + STMT_RUN ("CREATE TABLE IF NOT EXISTS channels (\n" + " id BIGINT UNSIGNED AUTO_INCREMENT,\n" + " pub_key BLOB(32),\n" + " max_state_message_id BIGINT UNSIGNED,\n" + " state_hash_message_id BIGINT UNSIGNED,\n" + " PRIMARY KEY(id),\n" + " UNIQUE KEY(pub_key(32))\n" + ");"); + + STMT_RUN ("CREATE TABLE IF NOT EXISTS slaves (\n" + " id BIGINT UNSIGNED AUTO_INCREMENT,\n" + " pub_key BLOB(32),\n" + " PRIMARY KEY(id),\n" + " UNIQUE KEY(pub_key(32))\n" + ");"); + + STMT_RUN ("CREATE TABLE IF NOT EXISTS membership (\n" + " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" + " slave_id BIGINT UNSIGNED NOT NULL REFERENCES slaves(id),\n" + " did_join TINYINT NOT NULL,\n" + " announced_at BIGINT UNSIGNED NOT NULL,\n" + " effective_since BIGINT UNSIGNED NOT NULL,\n" + " group_generation BIGINT UNSIGNED NOT NULL\n" + ");"); + +/*** FIX because IF NOT EXISTS doesn't work ***/ + GNUNET_MYSQL_statement_run (plugin->mc, + "CREATE INDEX idx_membership_channel_id_slave_id " + "ON membership (channel_id, slave_id);"); + + /** @todo messages table: add method_name column */ + STMT_RUN ("CREATE TABLE IF NOT EXISTS messages (\n" + " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" + " hop_counter BIGINT UNSIGNED NOT NULL,\n" + " signature BLOB,\n" + " purpose BLOB,\n" + " fragment_id BIGINT UNSIGNED NOT NULL,\n" + " fragment_offset BIGINT UNSIGNED NOT NULL,\n" + " message_id BIGINT UNSIGNED NOT NULL,\n" + " group_generation BIGINT UNSIGNED NOT NULL,\n" + " multicast_flags BIGINT UNSIGNED NOT NULL,\n" + " psycstore_flags BIGINT UNSIGNED NOT NULL,\n" + " data BLOB,\n" + " PRIMARY KEY (channel_id, fragment_id),\n" + " UNIQUE KEY(channel_id, message_id, fragment_offset)\n" + ");"); + + STMT_RUN ("CREATE TABLE IF NOT EXISTS state (\n" + " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" + " name TEXT NOT NULL,\n" + " value_current BLOB,\n" + " value_signed BLOB\n" + //" PRIMARY KEY (channel_id, name(255))\n" + ");"); + + STMT_RUN ("CREATE TABLE IF NOT EXISTS state_sync (\n" + " channel_id BIGINT UNSIGNED NOT NULL REFERENCES channels(id),\n" + " name TEXT NOT NULL,\n" + " value BLOB\n" + //" PRIMARY KEY (channel_id, name(255))\n" + ");"); +#undef STMT_RUN + + /* Prepare statements */ +#define PREP(stmt,handle) \ + if (GNUNET_OK != mysql_prepare (plugin->mc, stmt, handle)) \ + { \ + GNUNET_break (0); \ + return GNUNET_SYSERR; \ + } + PREP ("INSERT IGNORE INTO channels (pub_key) VALUES (?);", + &plugin->insert_channel_key); + PREP ("INSERT IGNORE INTO slaves (pub_key) VALUES (?);", + &plugin->insert_slave_key); + PREP ("INSERT INTO membership\n" + " (channel_id, slave_id, did_join, announced_at,\n" + " effective_since, group_generation)\n" + "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n" + " (SELECT id FROM slaves WHERE pub_key = ?),\n" + " ?, ?, ?, ?);", + &plugin->insert_membership); + PREP ("SELECT did_join FROM membership\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n" + " AND effective_since <= ? AND did_join = 1\n" + "ORDER BY announced_at DESC LIMIT 1;", + &plugin->select_membership); + + PREP ("INSERT IGNORE INTO messages\n" + " (channel_id, hop_counter, signature, purpose,\n" + " fragment_id, fragment_offset, message_id,\n" + " group_generation, multicast_flags, psycstore_flags, data)\n" + "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n" + " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", + &plugin->insert_fragment); + + PREP ("UPDATE messages\n" + "SET psycstore_flags = psycstore_flags | ?\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND message_id = ? AND fragment_offset = 0;", + &plugin->update_message_flags); + + PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND ? <= fragment_id AND fragment_id <= ? LIMIT 1;", + &plugin->select_fragments); + + /** @todo select_messages: add method_prefix filter */ + PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND ? <= message_id AND message_id <= ?\n" + "LIMIT ?;", + &plugin->select_messages); + + PREP ("SELECT * FROM\n" + "(SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + " FROM messages\n" + " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " ORDER BY fragment_id DESC\n" + " LIMIT ?)\n" + "ORDER BY fragment_id;", + &plugin->select_latest_fragments); + + /** @todo select_latest_messages: add method_prefix filter */ + PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND message_id IN\n" + " (SELECT message_id\n" + " FROM messages\n" + " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " GROUP BY message_id\n" + " ORDER BY message_id\n" + " DESC LIMIT ?)\n" + "ORDER BY fragment_id;", + &plugin->select_latest_messages); + + PREP ("SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND message_id = ? AND fragment_offset = ?;", + &plugin->select_message_fragment); + + PREP ("SELECT fragment_id, message_id, group_generation\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + "ORDER BY fragment_id DESC LIMIT 1;", + &plugin->select_counters_message); + + PREP ("SELECT max_state_message_id\n" + "FROM channels\n" + "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;", + &plugin->select_counters_state); + + PREP ("UPDATE channels\n" + "SET max_state_message_id = ?\n" + "WHERE pub_key = ?;", + &plugin->update_max_state_message_id); + + PREP ("UPDATE channels\n" + "SET state_hash_message_id = ?\n" + "WHERE pub_key = ?;", + &plugin->update_state_hash_message_id); + + PREP ("REPLACE INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT new.channel_id, new.name, new.value_current, old.value_signed\n" + "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?) AS channel_id,\n" + " (SELECT ?) AS name,\n" + " (SELECT ?) AS value_current\n" + " ) AS new\n" + "LEFT JOIN (SELECT channel_id, name, value_signed\n" + " FROM state) AS old\n" + "ON new.channel_id = old.channel_id AND new.name = old.name;", + &plugin->insert_state_current); + + PREP ("DELETE FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND (value_current IS NULL OR length(value_current) = 0)\n" + " AND (value_signed IS NULL OR length(value_signed) = 0);", + &plugin->delete_state_empty); + + PREP ("UPDATE state\n" + "SET value_signed = value_current\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->update_state_signed); + + PREP ("DELETE FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->delete_state); + + PREP ("INSERT INTO state_sync (channel_id, name, value)\n" + "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);", + &plugin->insert_state_sync); + + PREP ("INSERT INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT channel_id, name, value, value\n" + "FROM state_sync\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->insert_state_from_sync); + + PREP ("DELETE FROM state_sync\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->delete_state_sync); + + PREP ("SELECT value_current\n" + "FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND name = ?;", + &plugin->select_state_one); + + PREP ("SELECT name, value_current\n" + "FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND (name = ? OR substr(name, 1, ?) = ?);", + &plugin->select_state_prefix); + + PREP ("SELECT name, value_signed\n" + "FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)" + " AND value_signed IS NOT NULL;", + &plugin->select_state_signed); +#undef PREP + + return GNUNET_OK; +} + + +/** + * Shutdown database connection and associate data + * structures. + * @param plugin the plugin context (state for this module) + */ +static void +database_shutdown (struct Plugin *plugin) +{ + GNUNET_MYSQL_context_destroy (plugin->mc); +} + + +/** + * Execute a prepared statement with a @a channel_key argument. + * + * @param plugin Plugin handle. + * @param stmt Statement to execute. + * @param channel_key Public key of the channel. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +exec_channel (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct GNUNET_MY_QueryParam params[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_channel", stmt); + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Begin a transaction. + */ +static int +transaction_begin (struct Plugin *plugin, enum Transactions transaction) +{ + if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "BEGIN")) + { + LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_begin failed"); + return GNUNET_SYSERR; + } + + plugin->transaction = transaction; + return GNUNET_OK; +} + + +/** + * Commit current transaction. + */ +static int +transaction_commit (struct Plugin *plugin) +{ + if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "COMMIT")) + { + LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_commit failed"); + return GNUNET_SYSERR; + } + + plugin->transaction = TRANSACTION_NONE; + return GNUNET_OK; +} + + +/** + * Roll back current transaction. + */ +static int +transaction_rollback (struct Plugin *plugin) +{ + if (GNUNET_OK != GNUNET_MYSQL_statement_run (plugin->mc, "ROLLBACK")) + { + LOG(GNUNET_ERROR_TYPE_ERROR, "transaction_rollback failed"); + return GNUNET_SYSERR; + } + + plugin->transaction = TRANSACTION_NONE; + return GNUNET_OK; +} + + +static int +channel_key_store (struct Plugin *plugin, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_channel_key; + + struct GNUNET_MY_QueryParam params[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +static int +slave_key_store (struct Plugin *plugin, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) +{ + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_slave_key; + + struct GNUNET_MY_QueryParam params[] = { + GNUNET_MY_query_param_auto_from_type (slave_key), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Store join/leave events for a PSYC channel in order to be able to answer + * membership test queries later. + * + * @see GNUNET_PSYCSTORE_membership_store() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +mysql_membership_store (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + int did_join, + uint64_t announced_at, + uint64_t effective_since, + uint64_t group_generation) +{ + struct Plugin *plugin = cls; + + uint32_t idid_join = (uint32_t)did_join; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_membership; + + GNUNET_assert (TRANSACTION_NONE == plugin->transaction); + + if (announced_at > INT64_MAX || + effective_since > INT64_MAX || + group_generation > INT64_MAX) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + if (GNUNET_OK != channel_key_store (plugin, channel_key) + || GNUNET_OK != slave_key_store (plugin, slave_key)) + return GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_auto_from_type (slave_key), + GNUNET_MY_query_param_uint32 (&idid_join), + GNUNET_MY_query_param_uint64 (&announced_at), + GNUNET_MY_query_param_uint64 (&effective_since), + GNUNET_MY_query_param_uint64 (&group_generation), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + +/** + * Test if a member was admitted to the channel at the given message ID. + * + * @see GNUNET_PSYCSTORE_membership_test() + * + * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not, + * #GNUNET_SYSERR if there was en error. + */ +static int +membership_test (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_id) +{ + struct Plugin *plugin = cls; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_membership; + + uint32_t did_join = 0; + + int ret = GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_auto_from_type (slave_key), + GNUNET_MY_query_param_uint64 (&message_id), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql execute prepared", stmt); + return GNUNET_SYSERR; + } + + struct GNUNET_MY_ResultSpec results_select[] = { + GNUNET_MY_result_spec_uint32 (&did_join), + GNUNET_MY_result_spec_end + }; + + switch (GNUNET_MY_extract_result (stmt, results_select)) + { + case GNUNET_NO: + ret = GNUNET_NO; + break; + case GNUNET_OK: + ret = GNUNET_YES; + break; + default: + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql extract_result", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + +/** + * Store a message fragment sent to a channel. + * + * @see GNUNET_PSYCSTORE_fragment_store() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_store (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_MULTICAST_MessageHeader *msg, + uint32_t psycstore_flags) +{ + struct Plugin *plugin = cls; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->insert_fragment; + + GNUNET_assert (TRANSACTION_NONE == plugin->transaction); + + uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id); + + uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset); + uint64_t message_id = GNUNET_ntohll (msg->message_id); + uint64_t group_generation = GNUNET_ntohll (msg->group_generation); + + uint64_t hop_counter = ntohl(msg->hop_counter); + uint64_t flags = ntohl(msg->flags); + + if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX || + message_id > INT64_MAX || group_generation > INT64_MAX) + { + LOG(GNUNET_ERROR_TYPE_ERROR, + "Tried to store fragment with a field > INT64_MAX: " + "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset, + message_id, group_generation); + GNUNET_break (0); + return GNUNET_SYSERR; + } + + if (GNUNET_OK != channel_key_store (plugin, channel_key)) + return GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params_insert[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_uint64 (&hop_counter), + GNUNET_MY_query_param_auto_from_type (&msg->signature), + GNUNET_MY_query_param_auto_from_type (&msg->purpose), + GNUNET_MY_query_param_uint64 (&fragment_id), + GNUNET_MY_query_param_uint64 (&fragment_offset), + GNUNET_MY_query_param_uint64 (&message_id), + GNUNET_MY_query_param_uint64 (&group_generation), + GNUNET_MY_query_param_uint64 (&flags), + GNUNET_MY_query_param_uint32 (&psycstore_flags), + GNUNET_MY_query_param_fixed_size (&msg[1], ntohs (msg->header.size) + - sizeof (*msg)), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_insert)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql execute prepared", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + +/** + * Set additional flags for a given message. + * + * They are OR'd with any existing flags set. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_add_flags (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + uint32_t psycstore_flags) +{ + struct Plugin *plugin = cls; + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->update_message_flags; + + int sql_ret; + int ret = GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params_update[] = { + GNUNET_MY_query_param_uint32 (&psycstore_flags), + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_uint64 (&message_id), + GNUNET_MY_query_param_end + }; + + sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_update); + switch (sql_ret) + { + case GNUNET_OK: + ret = GNUNET_OK; + break; + + default: + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql execute prepared", stmt); + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +static int +fragment_row (struct GNUNET_MYSQL_StatementHandle *stmt, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls, + uint64_t *returned_fragments) +{ + + uint32_t hop_counter; + void *signature = NULL; + void *purpose = NULL; + size_t signature_size; + size_t purpose_size; + uint64_t fragment_id; + uint64_t fragment_offset; + uint64_t message_id; + uint64_t group_generation; + uint64_t flags; + void *buf; + size_t buf_size; + int ret = GNUNET_SYSERR; + int sql_ret; + struct GNUNET_MULTICAST_MessageHeader *mp; + uint64_t msg_flags; + struct GNUNET_MY_ResultSpec results[] = { + GNUNET_MY_result_spec_uint32 (&hop_counter), + GNUNET_MY_result_spec_variable_size (&signature, &signature_size), + GNUNET_MY_result_spec_variable_size (&purpose, &purpose_size), + GNUNET_MY_result_spec_uint64 (&fragment_id), + GNUNET_MY_result_spec_uint64 (&fragment_offset), + GNUNET_MY_result_spec_uint64 (&message_id), + GNUNET_MY_result_spec_uint64 (&group_generation), + GNUNET_MY_result_spec_uint64 (&msg_flags), + GNUNET_MY_result_spec_uint64 (&flags), + GNUNET_MY_result_spec_variable_size (&buf, + &buf_size), + GNUNET_MY_result_spec_end + }; + + do + { + sql_ret = GNUNET_MY_extract_result (stmt, results); + switch (sql_ret) + { + case GNUNET_NO: + if (ret != GNUNET_YES) + ret = GNUNET_NO; + break; + + case GNUNET_YES: + mp = GNUNET_malloc (sizeof (*mp) + buf_size); + + mp->header.size = htons (sizeof (*mp) + buf_size); + mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + mp->hop_counter = htonl (hop_counter); + GNUNET_memcpy (&mp->signature, + signature, + signature_size); + GNUNET_memcpy (&mp->purpose, + purpose, + purpose_size); + mp->fragment_id = GNUNET_htonll (fragment_id); + mp->fragment_offset = GNUNET_htonll (fragment_offset); + mp->message_id = GNUNET_htonll (message_id); + mp->group_generation = GNUNET_htonll (group_generation); + mp->flags = htonl(msg_flags); + + GNUNET_memcpy (&mp[1], + buf, + buf_size); + ret = cb (cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); + if (NULL != returned_fragments) + (*returned_fragments)++; + GNUNET_MY_cleanup_result (results); + break; + + default: + LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql extract_result", stmt); + } + } + while (GNUNET_YES == sql_ret); + + // for debugging + if (GNUNET_NO == ret) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING | GNUNET_ERROR_TYPE_BULK, + "Empty result set\n"); + + return ret; +} + + +static int +fragment_select (struct Plugin *plugin, + struct GNUNET_MYSQL_StatementHandle *stmt, + struct GNUNET_MY_QueryParam *params, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + int ret = GNUNET_SYSERR; + int sql_ret; + + sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params); + switch (sql_ret) + { + case GNUNET_NO: + if (ret != GNUNET_YES) + ret = GNUNET_NO; + break; + + case GNUNET_YES: + ret = fragment_row (stmt, cb, cb_cls, returned_fragments); + break; + + default: + LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + } + return ret; +} + + +/** + * Retrieve a message fragment range by fragment ID. + * + * @see GNUNET_PSYCSTORE_fragment_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_fragments; + int ret = GNUNET_SYSERR; + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_uint64 (&first_fragment_id), + GNUNET_MY_query_param_uint64 (&last_fragment_id), + GNUNET_MY_query_param_end + }; + + *returned_fragments = 0; + ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Retrieve a message fragment range by fragment ID. + * + * @see GNUNET_PSYCSTORE_fragment_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_fragments; + + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_uint64 (&fragment_limit), + GNUNET_MY_query_param_end + }; + + ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Retrieve all fragments of a message ID range. + * + * @see GNUNET_PSYCSTORE_message_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t first_message_id, + uint64_t last_message_id, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_messages; + int ret; + + if (0 == fragment_limit) + fragment_limit = UINT64_MAX; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_uint64 (&first_message_id), + GNUNET_MY_query_param_uint64 (&last_message_id), + GNUNET_MY_query_param_uint64 (&fragment_limit), + GNUNET_MY_query_param_end + }; + + *returned_fragments = 0; + ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Retrieve all fragments of the latest messages. + * + * @see GNUNET_PSYCSTORE_message_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_latest_messages; + + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_uint64 (&message_limit), + GNUNET_MY_query_param_end + }; + + ret = fragment_select (plugin, stmt, params_select, returned_fragments, cb, cb_cls); + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Retrieve a fragment of message specified by its message ID and fragment + * offset. + * + * @see GNUNET_PSYCSTORE_message_get_fragment() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get_fragment (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + uint64_t fragment_offset, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_message_fragment; + int sql_ret; + int ret = GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_uint64 (&message_id), + GNUNET_MY_query_param_uint64 (&fragment_offset), + GNUNET_MY_query_param_end + }; + + sql_ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select); + switch (sql_ret) + { + case GNUNET_NO: + ret = GNUNET_NO; + break; + + case GNUNET_OK: + ret = fragment_row (stmt, cb, cb_cls, NULL); + break; + + default: + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql execute prepared", stmt); + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + +/** + * Retrieve the max. values of message counters for a channel. + * + * @see GNUNET_PSYCSTORE_counters_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +counters_message_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t *max_fragment_id, + uint64_t *max_message_id, + uint64_t *max_group_generation) +{ + struct Plugin *plugin = cls; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_message; + + int ret = GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql execute prepared", stmt); + return GNUNET_SYSERR; + } + + struct GNUNET_MY_ResultSpec results_select[] = { + GNUNET_MY_result_spec_uint64 (max_fragment_id), + GNUNET_MY_result_spec_uint64 (max_message_id), + GNUNET_MY_result_spec_uint64 (max_group_generation), + GNUNET_MY_result_spec_end + }; + + ret = GNUNET_MY_extract_result (stmt, results_select); + + if (GNUNET_OK != ret) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql extract_result", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + +/** + * Retrieve the max. values of state counters for a channel. + * + * @see GNUNET_PSYCSTORE_counters_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +counters_state_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t *max_state_message_id) +{ + struct Plugin *plugin = cls; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_counters_state; + + int ret = GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql execute prepared", stmt); + return GNUNET_SYSERR; + } + + struct GNUNET_MY_ResultSpec results_select[] = { + GNUNET_MY_result_spec_uint64 (max_state_message_id), + GNUNET_MY_result_spec_end + }; + + ret = GNUNET_MY_extract_result (stmt, results_select); + + if (GNUNET_OK != ret) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql extract_result", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Assign a value to a state variable. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_assign (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, const void *value, size_t value_size) +{ + int ret = GNUNET_SYSERR; + + struct GNUNET_MY_QueryParam params[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_string (name), + GNUNET_MY_query_param_fixed_size(value, value_size), + GNUNET_MY_query_param_end + }; + + ret = GNUNET_MY_exec_prepared (plugin->mc, stmt, params); + if (GNUNET_OK != ret) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +static int +update_message_id (struct Plugin *plugin, struct GNUNET_MYSQL_StatementHandle *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id) +{ + struct GNUNET_MY_QueryParam params[] = { + GNUNET_MY_query_param_uint64 (&message_id), + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, + stmt, + params)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql execute prepared", stmt); + return GNUNET_SYSERR; + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Begin modifying current state. + */ +static int +state_modify_begin (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, uint64_t state_delta) +{ + struct Plugin *plugin = cls; + + if (state_delta > 0) + { + /** + * We can only apply state modifiers in the current message if modifiers in + * the previous stateful message (message_id - state_delta) were already + * applied. + */ + + uint64_t max_state_message_id = 0; + int ret = counters_state_get (plugin, channel_key, &max_state_message_id); + switch (ret) + { + case GNUNET_OK: + case GNUNET_NO: // no state yet + ret = GNUNET_OK; + break; + default: + return ret; + } + + if (max_state_message_id < message_id - state_delta) + return GNUNET_NO; /* some stateful messages not yet applied */ + else if (message_id - state_delta < max_state_message_id) + return GNUNET_NO; /* changes already applied */ + } + + if (TRANSACTION_NONE != plugin->transaction) + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } + return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); +} + + +/** + * Set the current value of state variable. + * + * @see GNUNET_PSYCSTORE_state_modify() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_modify_op (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + enum GNUNET_PSYC_Operator op, + const char *name, const void *value, size_t value_size) +{ + struct Plugin *plugin = cls; + GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); + + switch (op) + { + case GNUNET_PSYC_OP_ASSIGN: + return state_assign (plugin, plugin->insert_state_current, + channel_key, name, value, value_size); + + default: /** @todo implement more state operations */ + GNUNET_break (0); + return GNUNET_SYSERR; + } +} + + +/** + * End modifying current state. + */ +static int +state_modify_end (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id) +{ + struct Plugin *plugin = cls; + GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); + + return + GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key) + && GNUNET_OK == update_message_id (plugin, + plugin->update_max_state_message_id, + channel_key, message_id) + && GNUNET_OK == transaction_commit (plugin) + ? GNUNET_OK : GNUNET_SYSERR; +} + + +/** + * Begin state synchronization. + */ +static int +state_sync_begin (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, plugin->delete_state_sync, channel_key); +} + + +/** + * Assign current value of a state variable. + * + * @see GNUNET_PSYCSTORE_state_modify() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_sync_assign (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, const void *value, size_t value_size) +{ + struct Plugin *plugin = cls; + return state_assign (cls, plugin->insert_state_sync, + channel_key, name, value, value_size); +} + + +/** + * End modifying current state. + */ +static int +state_sync_end (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t max_state_message_id, + uint64_t state_hash_message_id) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + + if (TRANSACTION_NONE != plugin->transaction) + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } + + GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC) + && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key) + && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync, + channel_key) + && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync, + channel_key) + && GNUNET_OK == update_message_id (plugin, + plugin->update_state_hash_message_id, + channel_key, state_hash_message_id) + && GNUNET_OK == update_message_id (plugin, + plugin->update_max_state_message_id, + channel_key, max_state_message_id) + && GNUNET_OK == transaction_commit (plugin) + ? ret = GNUNET_OK + : transaction_rollback (plugin); + return ret; +} + + +/** + * Delete the whole state. + * + * @see GNUNET_PSYCSTORE_state_reset() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, plugin->delete_state, channel_key); +} + + +/** + * Update signed values of state variables in the state store. + * + * @see GNUNET_PSYCSTORE_state_hash_update() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_update_signed (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, plugin->update_state_signed, channel_key); +} + + +/** + * Retrieve a state variable by name. + * + * @see GNUNET_PSYCSTORE_state_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + int sql_ret ; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_one; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_string (name), + GNUNET_MY_query_param_end + }; + + void *value_current = NULL; + size_t value_size = 0; + + struct GNUNET_MY_ResultSpec results[] = { + GNUNET_MY_result_spec_variable_size (&value_current, &value_size), + GNUNET_MY_result_spec_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + } + else + { + sql_ret = GNUNET_MY_extract_result (stmt, results); + switch (sql_ret) + { + case GNUNET_NO: + ret = GNUNET_NO; + break; + + case GNUNET_YES: + ret = cb (cb_cls, name, value_current, value_size); + break; + + default: + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql extract_result", stmt); + } + } + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Retrieve all state variables for a channel with the given prefix. + * + * @see GNUNET_PSYCSTORE_state_get_prefix() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, GNUNET_PSYCSTORE_StateCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_prefix; + + uint32_t name_len = (uint32_t) strlen (name); + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_string (name), + GNUNET_MY_query_param_uint32 (&name_len), + GNUNET_MY_query_param_string (name), + GNUNET_MY_query_param_end + }; + + char *name2 = ""; + void *value_current = NULL; + size_t value_size = 0; + + struct GNUNET_MY_ResultSpec results[] = { + GNUNET_MY_result_spec_string (&name2), + GNUNET_MY_result_spec_variable_size (&value_current, &value_size), + GNUNET_MY_result_spec_end + };; + + int sql_ret; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + return GNUNET_SYSERR; + } + + do + { + sql_ret = GNUNET_MY_extract_result (stmt, results); + switch (sql_ret) + { + case GNUNET_NO: + if (ret != GNUNET_YES) + ret = GNUNET_NO; + break; + + case GNUNET_YES: + ret = cb (cb_cls, (const char *) name2, value_current, value_size); + + if (ret != GNUNET_YES) + sql_ret = GNUNET_NO; + break; + + default: + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql extract_result", stmt); + } + } + while (sql_ret == GNUNET_YES); + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Retrieve all signed state variables for a channel. + * + * @see GNUNET_PSYCSTORE_state_get_signed() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get_signed (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + + struct GNUNET_MYSQL_StatementHandle *stmt = plugin->select_state_signed; + + struct GNUNET_MY_QueryParam params_select[] = { + GNUNET_MY_query_param_auto_from_type (channel_key), + GNUNET_MY_query_param_end + }; + + int sql_ret; + + char *name = ""; + void *value_signed = NULL; + size_t value_size = 0; + + struct GNUNET_MY_ResultSpec results[] = { + GNUNET_MY_result_spec_string (&name), + GNUNET_MY_result_spec_variable_size (&value_signed, &value_size), + GNUNET_MY_result_spec_end + }; + + if (GNUNET_OK != GNUNET_MY_exec_prepared (plugin->mc, stmt, params_select)) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql exec_prepared", stmt); + return GNUNET_SYSERR; + } + + do + { + sql_ret = GNUNET_MY_extract_result (stmt, results); + switch (sql_ret) + { + case GNUNET_NO: + if (ret != GNUNET_YES) + ret = GNUNET_NO; + break; + + case GNUNET_YES: + ret = cb (cb_cls, (const char *) name, value_signed, value_size); + + if (ret != GNUNET_YES) + sql_ret = GNUNET_NO; + break; + + default: + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql extract_result", stmt); + } + } + while (sql_ret == GNUNET_YES); + + if (0 != mysql_stmt_reset (GNUNET_MYSQL_statement_get_stmt (stmt))) + { + LOG_MYSQL(plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "mysql_stmt_reset", stmt); + return GNUNET_SYSERR; + } + + return ret; +} + + +/** + * Entry point for the plugin. + * + * @param cls The struct GNUNET_CONFIGURATION_Handle. + * @return NULL on error, otherwise the plugin context + */ +void * +libgnunet_plugin_psycstore_mysql_init (void *cls) +{ + static struct Plugin plugin; + const struct GNUNET_CONFIGURATION_Handle *cfg = cls; + struct GNUNET_PSYCSTORE_PluginFunctions *api; + + if (NULL != plugin.cfg) + return NULL; /* can only initialize once! */ + memset (&plugin, 0, sizeof (struct Plugin)); + plugin.cfg = cfg; + if (GNUNET_OK != database_setup (&plugin)) + { + database_shutdown (&plugin); + return NULL; + } + api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions); + api->cls = &plugin; + api->membership_store = &mysql_membership_store; + api->membership_test = &membership_test; + api->fragment_store = &fragment_store; + api->message_add_flags = &message_add_flags; + api->fragment_get = &fragment_get; + api->fragment_get_latest = &fragment_get_latest; + api->message_get = &message_get; + api->message_get_latest = &message_get_latest; + api->message_get_fragment = &message_get_fragment; + api->counters_message_get = &counters_message_get; + api->counters_state_get = &counters_state_get; + api->state_modify_begin = &state_modify_begin; + api->state_modify_op = &state_modify_op; + api->state_modify_end = &state_modify_end; + api->state_sync_begin = &state_sync_begin; + api->state_sync_assign = &state_sync_assign; + api->state_sync_end = &state_sync_end; + api->state_reset = &state_reset; + api->state_update_signed = &state_update_signed; + api->state_get = &state_get; + api->state_get_prefix = &state_get_prefix; + api->state_get_signed = &state_get_signed; + + LOG (GNUNET_ERROR_TYPE_INFO, _("Mysql database running\n")); + return api; +} + + +/** + * Exit point from the plugin. + * + * @param cls The plugin context (as returned by "init") + * @return Always NULL + */ +void * +libgnunet_plugin_psycstore_mysql_done (void *cls) +{ + struct GNUNET_PSYCSTORE_PluginFunctions *api = cls; + struct Plugin *plugin = api->cls; + + database_shutdown (plugin); + plugin->cfg = NULL; + GNUNET_free (api); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Mysql plugin is finished\n"); + return NULL; +} + +/* end of plugin_psycstore_mysql.c */ diff --git a/src/psycstore/plugin_psycstore_postgres.c b/src/psycstore/plugin_psycstore_postgres.c new file mode 100644 index 0000000..33c9960 --- /dev/null +++ b/src/psycstore/plugin_psycstore_postgres.c @@ -0,0 +1,1530 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2016 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file psycstore/plugin_psycstore_postgres.c + * @brief PostgresQL-based psycstore backend + * @author Daniel Golle + * @author Gabor X Toth + * @author Christian Grothoff + * @author Christophe Genevey + * @author Jeffrey Burdges + */ + +#include "platform.h" +#include "gnunet_psycstore_plugin.h" +#include "gnunet_psycstore_service.h" +#include "gnunet_multicast_service.h" +#include "gnunet_crypto_lib.h" +#include "gnunet_psyc_util_lib.h" +#include "psycstore.h" +#include "gnunet_pq_lib.h" + +/** + * After how many ms "busy" should a DB operation fail for good? A + * low value makes sure that we are more responsive to requests + * (especially PUTs). A high value guarantees a higher success rate + * (SELECTs in iterate can take several seconds despite LIMIT=1). + * + * The default value of 1s should ensure that users do not experience + * huge latencies while at the same time allowing operations to + * succeed with reasonable probability. + */ +#define BUSY_TIMEOUT_MS 1000 + +#define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING + +#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-postgres", __VA_ARGS__) + +enum Transactions { + TRANSACTION_NONE = 0, + TRANSACTION_STATE_MODIFY, + TRANSACTION_STATE_SYNC, +}; + +/** + * Context for all functions in this plugin. + */ +struct Plugin +{ + + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Native Postgres database handle. + */ + PGconn *dbh; + + enum Transactions transaction; + + void *cls; +}; + + +/** + * Initialize the database connections and associated + * data structures (create tables and indices + * as needed as well). + * + * @param plugin the plugin context (state for this module) + * @return #GNUNET_OK on success + */ +static int +database_setup (struct Plugin *plugin) +{ + struct GNUNET_PQ_ExecuteStatement es[] = { + GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS channels (\n" + " id SERIAL,\n" + " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" + " max_state_message_id BIGINT,\n" + " state_hash_message_id BIGINT,\n" + " PRIMARY KEY(id)\n" + ")" + "WITH OIDS"), + GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS channel_pub_key_idx \n" + " ON channels (pub_key)"), + GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_chan_id(BYTEA) RETURNS INTEGER AS \n" + " 'SELECT id FROM channels WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" + "RETURNS NULL ON NULL INPUT"), + GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS slaves (\n" + " id SERIAL,\n" + " pub_key BYTEA NOT NULL CHECK (LENGTH(pub_key)=32),\n" + " PRIMARY KEY(id)\n" + ")" + "WITH OIDS"), + GNUNET_PQ_make_execute ("CREATE UNIQUE INDEX IF NOT EXISTS slaves_pub_key_idx \n" + " ON slaves (pub_key)"), + GNUNET_PQ_make_execute ("CREATE OR REPLACE FUNCTION get_slave_id(BYTEA) RETURNS INTEGER AS \n" + " 'SELECT id FROM slaves WHERE pub_key=$1;' LANGUAGE SQL STABLE \n" + "RETURNS NULL ON NULL INPUT"), + GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS membership (\n" + " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" + " slave_id BIGINT NOT NULL REFERENCES slaves(id),\n" + " did_join INT NOT NULL,\n" + " announced_at BIGINT NOT NULL,\n" + " effective_since BIGINT NOT NULL,\n" + " group_generation BIGINT NOT NULL\n" + ")" + "WITH OIDS"), + GNUNET_PQ_make_execute ("CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " + "ON membership (channel_id, slave_id)"), + /** @todo messages table: add method_name column */ + GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS messages (\n" + " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" + " hop_counter INT NOT NULL,\n" + " signature BYTEA CHECK (LENGTH(signature)=64),\n" + " purpose BYTEA CHECK (LENGTH(purpose)=8),\n" + " fragment_id BIGINT NOT NULL,\n" + " fragment_offset BIGINT NOT NULL,\n" + " message_id BIGINT NOT NULL,\n" + " group_generation BIGINT NOT NULL,\n" + " multicast_flags INT NOT NULL,\n" + " psycstore_flags INT NOT NULL,\n" + " data BYTEA,\n" + " PRIMARY KEY (channel_id, fragment_id),\n" + " UNIQUE (channel_id, message_id, fragment_offset)\n" + ")" + "WITH OIDS"), + GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state (\n" + " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" + " name TEXT NOT NULL,\n" + " value_current BYTEA,\n" + " value_signed BYTEA,\n" + " PRIMARY KEY (channel_id, name)\n" + ")" + "WITH OIDS"), + GNUNET_PQ_make_execute ("CREATE TABLE IF NOT EXISTS state_sync (\n" + " channel_id BIGINT NOT NULL REFERENCES channels(id),\n" + " name TEXT NOT NULL,\n" + " value BYTEA,\n" + " PRIMARY KEY (channel_id, name)\n" + ")" + "WITH OIDS"), + GNUNET_PQ_EXECUTE_STATEMENT_END + }; + + /* Open database and precompile statements */ + plugin->dbh = GNUNET_PQ_connect_with_cfg (plugin->cfg, + "psycstore-postgres"); + if (NULL == plugin->dbh) + return GNUNET_SYSERR; + if (GNUNET_OK != + GNUNET_PQ_exec_statements (plugin->dbh, + es)) + { + PQfinish (plugin->dbh); + plugin->dbh = NULL; + return GNUNET_SYSERR; + } + + /* Prepare statements */ + { + struct GNUNET_PQ_PreparedStatement ps[] = { + GNUNET_PQ_make_prepare ("transaction_begin", + "BEGIN", 0), + GNUNET_PQ_make_prepare ("transaction_commit", + "COMMIT", 0), + GNUNET_PQ_make_prepare ("transaction_rollback", + "ROLLBACK", 0), + GNUNET_PQ_make_prepare ("insert_channel_key", + "INSERT INTO channels (pub_key) VALUES ($1)" + " ON CONFLICT DO NOTHING", 1), + GNUNET_PQ_make_prepare ("insert_slave_key", + "INSERT INTO slaves (pub_key) VALUES ($1)" + " ON CONFLICT DO NOTHING", 1), + GNUNET_PQ_make_prepare ("insert_membership", + "INSERT INTO membership\n" + " (channel_id, slave_id, did_join, announced_at,\n" + " effective_since, group_generation)\n" + "VALUES (get_chan_id($1),\n" + " get_slave_id($2),\n" + " $3, $4, $5, $6)", 6), + GNUNET_PQ_make_prepare ("select_membership", + "SELECT did_join FROM membership\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND slave_id = get_slave_id($2)\n" + " AND effective_since <= $3 AND did_join = 1\n" + "ORDER BY announced_at DESC LIMIT 1", 3), + GNUNET_PQ_make_prepare ("insert_fragment", + "INSERT INTO messages\n" + " (channel_id, hop_counter, signature, purpose,\n" + " fragment_id, fragment_offset, message_id,\n" + " group_generation, multicast_flags, psycstore_flags, data)\n" + "VALUES (get_chan_id($1),\n" + " $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)" + "ON CONFLICT DO NOTHING", 11), + GNUNET_PQ_make_prepare ("update_message_flags", + "UPDATE messages\n" + "SET psycstore_flags = psycstore_flags | $1\n" + "WHERE channel_id = get_chan_id($2) \n" + " AND message_id = $3 AND fragment_offset = 0", 3), + GNUNET_PQ_make_prepare ("select_fragments", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1) \n" + " AND $2 <= fragment_id AND fragment_id <= $3", 3), + /** @todo select_messages: add method_prefix filter */ + GNUNET_PQ_make_prepare ("select_messages", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1) \n" + " AND $2 <= message_id AND message_id <= $3\n" + "LIMIT $4;", 4), + /** @todo select_latest_messages: add method_prefix filter */ + GNUNET_PQ_make_prepare ("select_latest_fragments", + "SELECT rev.hop_counter AS hop_counter,\n" + " rev.signature AS signature,\n" + " rev.purpose AS purpose,\n" + " rev.fragment_id AS fragment_id,\n" + " rev.fragment_offset AS fragment_offset,\n" + " rev.message_id AS message_id,\n" + " rev.group_generation AS group_generation,\n" + " rev.multicast_flags AS multicast_flags,\n" + " rev.psycstore_flags AS psycstore_flags,\n" + " rev.data AS data\n" + " FROM\n" + " (SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data \n" + " FROM messages\n" + " WHERE channel_id = get_chan_id($1) \n" + " ORDER BY fragment_id DESC\n" + " LIMIT $2) AS rev\n" + " ORDER BY rev.fragment_id;", 2), + GNUNET_PQ_make_prepare ("select_latest_messages", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND message_id IN\n" + " (SELECT message_id\n" + " FROM messages\n" + " WHERE channel_id = get_chan_id($2) \n" + " GROUP BY message_id\n" + " ORDER BY message_id\n" + " DESC LIMIT $3)\n" + "ORDER BY fragment_id", 3), + GNUNET_PQ_make_prepare ("select_message_fragment", + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1) \n" + " AND message_id = $2 AND fragment_offset = $3", 3), + GNUNET_PQ_make_prepare ("select_counters_message", + "SELECT fragment_id, message_id, group_generation\n" + "FROM messages\n" + "WHERE channel_id = get_chan_id($1)\n" + "ORDER BY fragment_id DESC LIMIT 1", 1), + GNUNET_PQ_make_prepare ("select_counters_state", + "SELECT max_state_message_id\n" + "FROM channels\n" + "WHERE pub_key = $1 AND max_state_message_id IS NOT NULL", 1), + GNUNET_PQ_make_prepare ("update_max_state_message_id", + "UPDATE channels\n" + "SET max_state_message_id = $1\n" + "WHERE pub_key = $2", 2), + + GNUNET_PQ_make_prepare ("update_state_hash_message_id", + "UPDATE channels\n" + "SET state_hash_message_id = $1\n" + "WHERE pub_key = $2", 2), + GNUNET_PQ_make_prepare ("insert_state_current", + "INSERT INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT new.channel_id, new.name,\n" + " new.value_current, old.value_signed\n" + "FROM (SELECT get_chan_id($1) AS channel_id,\n" + " $2::TEXT AS name, $3::BYTEA AS value_current) AS new\n" + "LEFT JOIN (SELECT channel_id, name, value_signed\n" + " FROM state) AS old\n" + "ON new.channel_id = old.channel_id AND new.name = old.name\n" + "ON CONFLICT (channel_id, name)\n" + " DO UPDATE SET value_current = EXCLUDED.value_current,\n" + " value_signed = EXCLUDED.value_signed", 3), + GNUNET_PQ_make_prepare ("delete_state_empty", + "DELETE FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = $1)\n" + " AND (value_current IS NULL OR length(value_current) = 0)\n" + " AND (value_signed IS NULL OR length(value_signed) = 0)", 1), + GNUNET_PQ_make_prepare ("update_state_signed", + "UPDATE state\n" + "SET value_signed = value_current\n" + "WHERE channel_id = get_chan_id($1) ", 1), + GNUNET_PQ_make_prepare ("delete_state", + "DELETE FROM state\n" + "WHERE channel_id = get_chan_id($1) ", 1), + GNUNET_PQ_make_prepare ("insert_state_sync", + "INSERT INTO state_sync (channel_id, name, value)\n" + "VALUES (get_chan_id($1), $2, $3)", 3), + GNUNET_PQ_make_prepare ("insert_state_from_sync", + "INSERT INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT channel_id, name, value, value\n" + "FROM state_sync\n" + "WHERE channel_id = get_chan_id($1)", 1), + GNUNET_PQ_make_prepare ("delete_state_sync", + "DELETE FROM state_sync\n" + "WHERE channel_id = get_chan_id($1)", 1), + GNUNET_PQ_make_prepare ("select_state_one", + "SELECT value_current\n" + "FROM state\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND name = $2", 2), + GNUNET_PQ_make_prepare ("select_state_prefix", + "SELECT name, value_current\n" + "FROM state\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND (name = $2 OR substr(name, 1, $3) = $4)", 4), + GNUNET_PQ_make_prepare ("select_state_signed", + "SELECT name, value_signed\n" + "FROM state\n" + "WHERE channel_id = get_chan_id($1)\n" + " AND value_signed IS NOT NULL", 1), + GNUNET_PQ_PREPARED_STATEMENT_END + }; + + if (GNUNET_OK != + GNUNET_PQ_prepare_statements (plugin->dbh, + ps)) + { + PQfinish (plugin->dbh); + plugin->dbh = NULL; + return GNUNET_SYSERR; + } + } + + return GNUNET_OK; +} + + +/** + * Shutdown database connection and associate data + * structures. + * @param plugin the plugin context (state for this module) + */ +static void +database_shutdown (struct Plugin *plugin) +{ + PQfinish (plugin->dbh); + plugin->dbh = NULL; +} + + +/** + * Execute a prepared statement with a @a channel_key argument. + * + * @param plugin Plugin handle. + * @param stmt Statement to execute. + * @param channel_key Public key of the channel. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +exec_channel (struct Plugin *plugin, const char *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + + +/** + * Begin a transaction. + */ +static int +transaction_begin (struct Plugin *plugin, enum Transactions transaction) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_begin", params)) + return GNUNET_SYSERR; + + plugin->transaction = transaction; + return GNUNET_OK; +} + + +/** + * Commit current transaction. + */ +static int +transaction_commit (struct Plugin *plugin) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_commit", params)) + return GNUNET_SYSERR; + + plugin->transaction = TRANSACTION_NONE; + return GNUNET_OK; +} + + +/** + * Roll back current transaction. + */ +static int +transaction_rollback (struct Plugin *plugin) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "transaction_rollback", params)) + return GNUNET_SYSERR; + + plugin->transaction = TRANSACTION_NONE; + return GNUNET_OK; +} + + +static int +channel_key_store (struct Plugin *plugin, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, + "insert_channel_key", + params)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + + +static int +slave_key_store (struct Plugin *plugin, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (slave_key), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_slave_key", params)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + + +/** + * Store join/leave events for a PSYC channel in order to be able to answer + * membership test queries later. + * + * @see GNUNET_PSYCSTORE_membership_store() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +postgres_membership_store (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + int did_join, + uint64_t announced_at, + uint64_t effective_since, + uint64_t group_generation) +{ + struct Plugin *plugin = cls; + uint32_t idid_join = (uint32_t) did_join; + + GNUNET_assert (TRANSACTION_NONE == plugin->transaction); + + if ( (announced_at > INT64_MAX) || + (effective_since > INT64_MAX) || + (group_generation > INT64_MAX) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + if ( (GNUNET_OK != + channel_key_store (plugin, channel_key)) || + (GNUNET_OK != + slave_key_store (plugin, slave_key)) ) + return GNUNET_SYSERR; + + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_auto_from_type (slave_key), + GNUNET_PQ_query_param_uint32 (&idid_join), + GNUNET_PQ_query_param_uint64 (&announced_at), + GNUNET_PQ_query_param_uint64 (&effective_since), + GNUNET_PQ_query_param_uint64 (&group_generation), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, + "insert_membership", + params)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + +/** + * Test if a member was admitted to the channel at the given message ID. + * + * @see GNUNET_PSYCSTORE_membership_test() + * + * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not, + * #GNUNET_SYSERR if there was en error. + */ +static int +membership_test (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_id) +{ + struct Plugin *plugin = cls; + + uint32_t did_join = 0; + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_auto_from_type (slave_key), + GNUNET_PQ_query_param_uint64 (&message_id), + GNUNET_PQ_query_param_end + }; + + struct GNUNET_PQ_ResultSpec results_select[] = { + GNUNET_PQ_result_spec_uint32 ("did_join", &did_join), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, "select_membership", + params_select, results_select)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + +/** + * Store a message fragment sent to a channel. + * + * @see GNUNET_PSYCSTORE_fragment_store() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_store (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_MULTICAST_MessageHeader *msg, + uint32_t psycstore_flags) +{ + struct Plugin *plugin = cls; + + GNUNET_assert (TRANSACTION_NONE == plugin->transaction); + + uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id); + + uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset); + uint64_t message_id = GNUNET_ntohll (msg->message_id); + uint64_t group_generation = GNUNET_ntohll (msg->group_generation); + + uint32_t hop_counter = ntohl(msg->hop_counter); + uint32_t flags = ntohl(msg->flags); + + if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX || + message_id > INT64_MAX || group_generation > INT64_MAX) + { + LOG(GNUNET_ERROR_TYPE_ERROR, + "Tried to store fragment with a field > INT64_MAX: " + "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset, + message_id, group_generation); + GNUNET_break (0); + return GNUNET_SYSERR; + } + + if (GNUNET_OK != channel_key_store (plugin, channel_key)) + return GNUNET_SYSERR; + + struct GNUNET_PQ_QueryParam params_insert[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_uint32 (&hop_counter), + GNUNET_PQ_query_param_auto_from_type (&msg->signature), + GNUNET_PQ_query_param_auto_from_type (&msg->purpose), + GNUNET_PQ_query_param_uint64 (&fragment_id), + GNUNET_PQ_query_param_uint64 (&fragment_offset), + GNUNET_PQ_query_param_uint64 (&message_id), + GNUNET_PQ_query_param_uint64 (&group_generation), + GNUNET_PQ_query_param_uint32 (&flags), + GNUNET_PQ_query_param_uint32 (&psycstore_flags), + GNUNET_PQ_query_param_fixed_size (&msg[1], ntohs (msg->header.size) - sizeof (*msg)), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "insert_fragment", params_insert)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + +/** + * Set additional flags for a given message. + * + * They are OR'd with any existing flags set. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_add_flags (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + uint32_t psycstore_flags) +{ + struct Plugin *plugin = cls; + + struct GNUNET_PQ_QueryParam params_update[] = { + GNUNET_PQ_query_param_uint32 (&psycstore_flags), + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_uint64 (&message_id), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, "update_message_flags", params_update)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + + +/** + * Closure for #fragment_rows. + */ +struct FragmentRowsContext { + GNUNET_PSYCSTORE_FragmentCallback cb; + void *cb_cls; + + uint64_t *returned_fragments; + + /* I preserved this but I do not see the point since + * it cannot stop the loop early and gets overwritten ?? */ + int ret; +}; + + +/** + * Callback that retrieves the results of a SELECT statement + * reading form the messages table. + * + * Only passed to GNUNET_PQ_eval_prepared_multi_select and + * has type GNUNET_PQ_PostgresResultHandler. + * + * @param cls closure + * @param result the postgres result + * @param num_result the number of results in @a result + */ +void fragment_rows (void *cls, + PGresult *res, + unsigned int num_results) +{ + struct FragmentRowsContext *c = cls; + + for (unsigned int i=0;i<num_results;i++) + { + uint32_t hop_counter; + void *signature = NULL; + void *purpose = NULL; + size_t signature_size; + size_t purpose_size; + uint64_t fragment_id; + uint64_t fragment_offset; + uint64_t message_id; + uint64_t group_generation; + uint32_t flags; + void *buf; + size_t buf_size; + uint32_t msg_flags; + struct GNUNET_PQ_ResultSpec results[] = { + GNUNET_PQ_result_spec_uint32 ("hop_counter", &hop_counter), + GNUNET_PQ_result_spec_variable_size ("signature", &signature, &signature_size), + GNUNET_PQ_result_spec_variable_size ("purpose", &purpose, &purpose_size), + GNUNET_PQ_result_spec_uint64 ("fragment_id", &fragment_id), + GNUNET_PQ_result_spec_uint64 ("fragment_offset", &fragment_offset), + GNUNET_PQ_result_spec_uint64 ("message_id", &message_id), + GNUNET_PQ_result_spec_uint64 ("group_generation", &group_generation), + GNUNET_PQ_result_spec_uint32 ("multicast_flags", &msg_flags), + GNUNET_PQ_result_spec_uint32 ("psycstore_flags", &flags), + GNUNET_PQ_result_spec_variable_size ("data", &buf, &buf_size), + GNUNET_PQ_result_spec_end + }; + struct GNUNET_MULTICAST_MessageHeader *mp; + + if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i)) + { + GNUNET_PQ_cleanup_result(results); /* missing previously, a memory leak?? */ + break; /* nothing more?? */ + } + + mp = GNUNET_malloc (sizeof (*mp) + buf_size); + + mp->header.size = htons (sizeof (*mp) + buf_size); + mp->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + mp->hop_counter = htonl (hop_counter); + GNUNET_memcpy (&mp->signature, + signature, signature_size); + GNUNET_memcpy (&mp->purpose, + purpose, purpose_size); + mp->fragment_id = GNUNET_htonll (fragment_id); + mp->fragment_offset = GNUNET_htonll (fragment_offset); + mp->message_id = GNUNET_htonll (message_id); + mp->group_generation = GNUNET_htonll (group_generation); + mp->flags = htonl(msg_flags); + + GNUNET_memcpy (&mp[1], + buf, buf_size); + GNUNET_PQ_cleanup_result(results); + c->ret = c->cb (c->cb_cls, mp, (enum GNUNET_PSYCSTORE_MessageFlags) flags); + if (NULL != c->returned_fragments) + (*c->returned_fragments)++; + } +} + + +static int +fragment_select (struct Plugin *plugin, + const char *stmt, + struct GNUNET_PQ_QueryParam *params, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + /* Stack based closure */ + struct FragmentRowsContext frc = { + .cb = cb, + .cb_cls = cb_cls, + .returned_fragments = returned_fragments, + .ret = GNUNET_SYSERR + }; + + if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + stmt, params, + &fragment_rows, &frc)) + return GNUNET_SYSERR; + return frc.ret; /* GNUNET_OK ?? */ +} + +/** + * Retrieve a message fragment range by fragment ID. + * + * @see GNUNET_PSYCSTORE_fragment_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_uint64 (&first_fragment_id), + GNUNET_PQ_query_param_uint64 (&last_fragment_id), + GNUNET_PQ_query_param_end + }; + + *returned_fragments = 0; + return fragment_select (plugin, + "select_fragments", + params_select, + returned_fragments, + cb, cb_cls); +} + + +/** + * Retrieve a message fragment range by fragment ID. + * + * @see GNUNET_PSYCSTORE_fragment_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + + *returned_fragments = 0; + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_uint64 (&fragment_limit), + GNUNET_PQ_query_param_end + }; + + return fragment_select (plugin, + "select_latest_fragments", + params_select, + returned_fragments, + cb, cb_cls); +} + + +/** + * Retrieve all fragments of a message ID range. + * + * @see GNUNET_PSYCSTORE_message_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t first_message_id, + uint64_t last_message_id, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_uint64 (&first_message_id), + GNUNET_PQ_query_param_uint64 (&last_message_id), + GNUNET_PQ_query_param_uint64 (&fragment_limit), + GNUNET_PQ_query_param_end + }; + + if (0 == fragment_limit) + fragment_limit = INT64_MAX; + *returned_fragments = 0; + return fragment_select (plugin, + "select_messages", + params_select, + returned_fragments, + cb, cb_cls); +} + + +/** + * Retrieve all fragments of the latest messages. + * + * @see GNUNET_PSYCSTORE_message_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_uint64 (&message_limit), + GNUNET_PQ_query_param_end + }; + + *returned_fragments = 0; + return fragment_select (plugin, + "select_latest_messages", + params_select, + returned_fragments, + cb, cb_cls); +} + + +/** + * Retrieve a fragment of message specified by its message ID and fragment + * offset. + * + * @see GNUNET_PSYCSTORE_message_get_fragment() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get_fragment (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + uint64_t fragment_offset, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + const char *stmt = "select_message_fragment"; + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_uint64 (&message_id), + GNUNET_PQ_query_param_uint64 (&fragment_offset), + GNUNET_PQ_query_param_end + }; + + /* Stack based closure */ + struct FragmentRowsContext frc = { + .cb = cb, + .cb_cls = cb_cls, + .returned_fragments = NULL, + .ret = GNUNET_SYSERR + }; + + if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + stmt, params_select, + &fragment_rows, &frc)) + return GNUNET_SYSERR; + return frc.ret; /* GNUNET_OK ?? */ +} + +/** + * Retrieve the max. values of message counters for a channel. + * + * @see GNUNET_PSYCSTORE_counters_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +counters_message_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t *max_fragment_id, + uint64_t *max_message_id, + uint64_t *max_group_generation) +{ + struct Plugin *plugin = cls; + + const char *stmt = "select_counters_message"; + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_end + }; + + struct GNUNET_PQ_ResultSpec results_select[] = { + GNUNET_PQ_result_spec_uint64 ("fragment_id", max_fragment_id), + GNUNET_PQ_result_spec_uint64 ("message_id", max_message_id), + GNUNET_PQ_result_spec_uint64 ("group_generation", max_group_generation), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, + params_select, results_select)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + +/** + * Retrieve the max. values of state counters for a channel. + * + * @see GNUNET_PSYCSTORE_counters_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +counters_state_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t *max_state_message_id) +{ + struct Plugin *plugin = cls; + + const char *stmt = "select_counters_state"; + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_end + }; + + struct GNUNET_PQ_ResultSpec results_select[] = { + GNUNET_PQ_result_spec_uint64 ("max_state_message_id", max_state_message_id), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, + params_select, results_select)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + + +/** + * Assign a value to a state variable. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_assign (struct Plugin *plugin, const char *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, const void *value, size_t value_size) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_string (name), + GNUNET_PQ_query_param_fixed_size (value, value_size), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + + +static int +update_message_id (struct Plugin *plugin, + const char *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id) +{ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_uint64 (&message_id), + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != + GNUNET_PQ_eval_prepared_non_select (plugin->dbh, stmt, params)) + return GNUNET_SYSERR; + + return GNUNET_OK; +} + + +/** + * Begin modifying current state. + */ +static int +state_modify_begin (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, uint64_t state_delta) +{ + struct Plugin *plugin = cls; + + if (state_delta > 0) + { + /** + * We can only apply state modifiers in the current message if modifiers in + * the previous stateful message (message_id - state_delta) were already + * applied. + */ + + uint64_t max_state_message_id = 0; + int ret = counters_state_get (plugin, channel_key, &max_state_message_id); + switch (ret) + { + case GNUNET_OK: + case GNUNET_NO: // no state yet + ret = GNUNET_OK; + break; + + default: + return ret; + } + + if (max_state_message_id < message_id - state_delta) + return GNUNET_NO; /* some stateful messages not yet applied */ + else if (message_id - state_delta < max_state_message_id) + return GNUNET_NO; /* changes already applied */ + } + + if (TRANSACTION_NONE != plugin->transaction) + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } + return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); +} + + +/** + * Set the current value of state variable. + * + * @see GNUNET_PSYCSTORE_state_modify() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_modify_op (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + enum GNUNET_PSYC_Operator op, + const char *name, const void *value, size_t value_size) +{ + struct Plugin *plugin = cls; + GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); + + switch (op) + { + case GNUNET_PSYC_OP_ASSIGN: + return state_assign (plugin, "insert_state_current", + channel_key, name, value, value_size); + + default: /** @todo implement more state operations */ + GNUNET_break (0); + return GNUNET_SYSERR; + } +} + + +/** + * End modifying current state. + */ +static int +state_modify_end (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id) +{ + struct Plugin *plugin = cls; + GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); + + return + GNUNET_OK == exec_channel (plugin, "delete_state_empty", channel_key) + && GNUNET_OK == update_message_id (plugin, + "update_max_state_message_id", + channel_key, message_id) + && GNUNET_OK == transaction_commit (plugin) + ? GNUNET_OK : GNUNET_SYSERR; +} + + +/** + * Begin state synchronization. + */ +static int +state_sync_begin (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, "delete_state_sync", channel_key); +} + + +/** + * Assign current value of a state variable. + * + * @see GNUNET_PSYCSTORE_state_modify() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_sync_assign (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, const void *value, size_t value_size) +{ + struct Plugin *plugin = cls; + return state_assign (plugin, "insert_state_sync", + channel_key, name, value, value_size); +} + + +/** + * End modifying current state. + */ +static int +state_sync_end (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t max_state_message_id, + uint64_t state_hash_message_id) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + + if (TRANSACTION_NONE != plugin->transaction) + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } + + GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC) + && GNUNET_OK == exec_channel (plugin, "delete_state", channel_key) + && GNUNET_OK == exec_channel (plugin, "insert_state_from_sync", + channel_key) + && GNUNET_OK == exec_channel (plugin, "delete_state_sync", + channel_key) + && GNUNET_OK == update_message_id (plugin, + "update_state_hash_message_id", + channel_key, state_hash_message_id) + && GNUNET_OK == update_message_id (plugin, + "update_max_state_message_id", + channel_key, max_state_message_id) + && GNUNET_OK == transaction_commit (plugin) + ? ret = GNUNET_OK + : transaction_rollback (plugin); + return ret; +} + + +/** + * Delete the whole state. + * + * @see GNUNET_PSYCSTORE_state_reset() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, "delete_state", channel_key); +} + + +/** + * Update signed values of state variables in the state store. + * + * @see GNUNET_PSYCSTORE_state_hash_update() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_update_signed (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, "update_state_signed", channel_key); +} + + +/** + * Retrieve a state variable by name. + * + * @see GNUNET_PSYCSTORE_state_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) +{ + struct Plugin *plugin = cls; + + const char *stmt = "select_state_one"; + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_string (name), + GNUNET_PQ_query_param_end + }; + + void *value_current = NULL; + size_t value_size = 0; + + struct GNUNET_PQ_ResultSpec results_select[] = { + GNUNET_PQ_result_spec_variable_size ("value_current", &value_current, &value_size), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + GNUNET_PQ_eval_prepared_singleton_select (plugin->dbh, stmt, + params_select, results_select)) + return GNUNET_SYSERR; + + return cb (cb_cls, name, value_current, + value_size); +} + + + +/** + * Closure for #get_state_cb. + */ +struct GetStateContext { + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key; + // const char *name, + GNUNET_PSYCSTORE_StateCallback cb; + void *cb_cls; + + const char *value_id; + + /* I preserved this but I do not see the point since + * it cannot stop the loop early and gets overwritten ?? */ + int ret; +}; + + +/** + * Callback that retrieves the results of a SELECT statement + * reading form the state table. + * + * Only passed to GNUNET_PQ_eval_prepared_multi_select and + * has type GNUNET_PQ_PostgresResultHandler. + * + * @param cls closure + * @param result the postgres result + * @param num_result the number of results in @a result + */ +static void +get_state_cb (void *cls, + PGresult *res, + unsigned int num_results) +{ + struct GetStateContext *c = cls; + + for (unsigned int i=0;i<num_results;i++) + { + char *name = ""; + void *value = NULL; + size_t value_size = 0; + + struct GNUNET_PQ_ResultSpec results[] = { + GNUNET_PQ_result_spec_string ("name", &name), + GNUNET_PQ_result_spec_variable_size (c->value_id, &value, &value_size), + GNUNET_PQ_result_spec_end + }; + + if (GNUNET_YES != GNUNET_PQ_extract_result (res, results, i)) + { + GNUNET_PQ_cleanup_result(results); /* previously invoked via PQclear?? */ + break; /* nothing more?? */ + } + + c->ret = c->cb (c->cb_cls, (const char *) name, value, value_size); + GNUNET_PQ_cleanup_result(results); + } +} + +/** + * Retrieve all state variables for a channel with the given prefix. + * + * @see GNUNET_PSYCSTORE_state_get_prefix() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, GNUNET_PSYCSTORE_StateCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + + const char *stmt = "select_state_prefix"; + + uint32_t name_len = (uint32_t) strlen (name); + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_string (name), + GNUNET_PQ_query_param_uint32 (&name_len), + GNUNET_PQ_query_param_string (name), + GNUNET_PQ_query_param_end + }; + + struct GetStateContext gsc = { + .cb = cb, + .cb_cls = cb_cls, + .value_id = "value_current", + .ret = GNUNET_NO + }; + + if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + stmt, params_select, + &get_state_cb, &gsc)) + return GNUNET_SYSERR; + return gsc.ret; /* GNUNET_OK ?? */ +} + + +/** + * Retrieve all signed state variables for a channel. + * + * @see GNUNET_PSYCSTORE_state_get_signed() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get_signed (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) +{ + struct Plugin *plugin = cls; + + const char *stmt = "select_state_signed"; + + struct GNUNET_PQ_QueryParam params_select[] = { + GNUNET_PQ_query_param_auto_from_type (channel_key), + GNUNET_PQ_query_param_end + }; + + struct GetStateContext gsc = { + .cb = cb, + .cb_cls = cb_cls, + .value_id = "value_signed", + .ret = GNUNET_NO + }; + + if (0 > GNUNET_PQ_eval_prepared_multi_select (plugin->dbh, + stmt, params_select, + &get_state_cb, &gsc)) + return GNUNET_SYSERR; + return gsc.ret; /* GNUNET_OK ?? */ +} + + +/** + * Entry point for the plugin. + * + * @param cls The struct GNUNET_CONFIGURATION_Handle. + * @return NULL on error, otherwise the plugin context + */ +void * +libgnunet_plugin_psycstore_postgres_init (void *cls) +{ + static struct Plugin plugin; + const struct GNUNET_CONFIGURATION_Handle *cfg = cls; + struct GNUNET_PSYCSTORE_PluginFunctions *api; + + if (NULL != plugin.cfg) + return NULL; /* can only initialize once! */ + memset (&plugin, 0, sizeof (struct Plugin)); + plugin.cfg = cfg; + if (GNUNET_OK != database_setup (&plugin)) + { + database_shutdown (&plugin); + return NULL; + } + api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions); + api->cls = &plugin; + api->membership_store = &postgres_membership_store; + api->membership_test = &membership_test; + api->fragment_store = &fragment_store; + api->message_add_flags = &message_add_flags; + api->fragment_get = &fragment_get; + api->fragment_get_latest = &fragment_get_latest; + api->message_get = &message_get; + api->message_get_latest = &message_get_latest; + api->message_get_fragment = &message_get_fragment; + api->counters_message_get = &counters_message_get; + api->counters_state_get = &counters_state_get; + api->state_modify_begin = &state_modify_begin; + api->state_modify_op = &state_modify_op; + api->state_modify_end = &state_modify_end; + api->state_sync_begin = &state_sync_begin; + api->state_sync_assign = &state_sync_assign; + api->state_sync_end = &state_sync_end; + api->state_reset = &state_reset; + api->state_update_signed = &state_update_signed; + api->state_get = &state_get; + api->state_get_prefix = &state_get_prefix; + api->state_get_signed = &state_get_signed; + + LOG (GNUNET_ERROR_TYPE_INFO, _("Postgres database running\n")); + return api; +} + + +/** + * Exit point from the plugin. + * + * @param cls The plugin context (as returned by "init") + * @return Always NULL + */ +void * +libgnunet_plugin_psycstore_postgres_done (void *cls) +{ + struct GNUNET_PSYCSTORE_PluginFunctions *api = cls; + struct Plugin *plugin = api->cls; + + database_shutdown (plugin); + plugin->cfg = NULL; + GNUNET_free (api); + LOG (GNUNET_ERROR_TYPE_DEBUG, "Postgres plugin has finished\n"); + return NULL; +} + +/* end of plugin_psycstore_postgres.c */ diff --git a/src/psycstore/plugin_psycstore_sqlite.c b/src/psycstore/plugin_psycstore_sqlite.c new file mode 100644 index 0000000..24de383 --- /dev/null +++ b/src/psycstore/plugin_psycstore_sqlite.c @@ -0,0 +1,1948 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file psycstore/plugin_psycstore_sqlite.c + * @brief sqlite-based psycstore backend + * @author Gabor X Toth + * @author Christian Grothoff + */ + +/* + * FIXME: SQLite3 only supports signed 64-bit integers natively, + * thus it can only store 63 bits of the uint64_t's. + */ + +#include "platform.h" +#include "gnunet_psycstore_plugin.h" +#include "gnunet_psycstore_service.h" +#include "gnunet_multicast_service.h" +#include "gnunet_crypto_lib.h" +#include "gnunet_psyc_util_lib.h" +#include "psycstore.h" +#include <sqlite3.h> + +/** + * After how many ms "busy" should a DB operation fail for good? A + * low value makes sure that we are more responsive to requests + * (especially PUTs). A high value guarantees a higher success rate + * (SELECTs in iterate can take several seconds despite LIMIT=1). + * + * The default value of 1s should ensure that users do not experience + * huge latencies while at the same time allowing operations to + * succeed with reasonable probability. + */ +#define BUSY_TIMEOUT_MS 1000 + +#define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING + +/** + * Log an error message at log-level 'level' that indicates + * a failure of the command 'cmd' on file 'filename' + * with the message given by strerror(errno). + */ +#define LOG_SQLITE(db, level, cmd) do { GNUNET_log_from (level, "psycstore-sqlite", _("`%s' failed at %s:%d with error: %s (%d)\n"), cmd, __FILE__, __LINE__, sqlite3_errmsg(db->dbh), sqlite3_errcode(db->dbh)); } while(0) + +#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-sqlite", __VA_ARGS__) + +enum Transactions { + TRANSACTION_NONE = 0, + TRANSACTION_STATE_MODIFY, + TRANSACTION_STATE_SYNC, +}; + +/** + * Context for all functions in this plugin. + */ +struct Plugin +{ + + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Database filename. + */ + char *fn; + + /** + * Native SQLite database handle. + */ + sqlite3 *dbh; + + /** + * Current transaction. + */ + enum Transactions transaction; + + sqlite3_stmt *transaction_begin; + + sqlite3_stmt *transaction_commit; + + sqlite3_stmt *transaction_rollback; + + /** + * Precompiled SQL for channel_key_store() + */ + sqlite3_stmt *insert_channel_key; + + /** + * Precompiled SQL for slave_key_store() + */ + sqlite3_stmt *insert_slave_key; + + + /** + * Precompiled SQL for membership_store() + */ + sqlite3_stmt *insert_membership; + + /** + * Precompiled SQL for membership_test() + */ + sqlite3_stmt *select_membership; + + + /** + * Precompiled SQL for fragment_store() + */ + sqlite3_stmt *insert_fragment; + + /** + * Precompiled SQL for message_add_flags() + */ + sqlite3_stmt *update_message_flags; + + /** + * Precompiled SQL for fragment_get() + */ + sqlite3_stmt *select_fragments; + + /** + * Precompiled SQL for fragment_get() + */ + sqlite3_stmt *select_latest_fragments; + + /** + * Precompiled SQL for message_get() + */ + sqlite3_stmt *select_messages; + + /** + * Precompiled SQL for message_get() + */ + sqlite3_stmt *select_latest_messages; + + /** + * Precompiled SQL for message_get_fragment() + */ + sqlite3_stmt *select_message_fragment; + + /** + * Precompiled SQL for counters_get_message() + */ + sqlite3_stmt *select_counters_message; + + /** + * Precompiled SQL for counters_get_state() + */ + sqlite3_stmt *select_counters_state; + + /** + * Precompiled SQL for state_modify_end() + */ + sqlite3_stmt *update_state_hash_message_id; + + /** + * Precompiled SQL for state_sync_end() + */ + sqlite3_stmt *update_max_state_message_id; + + /** + * Precompiled SQL for state_modify_op() + */ + sqlite3_stmt *insert_state_current; + + /** + * Precompiled SQL for state_modify_end() + */ + sqlite3_stmt *delete_state_empty; + + /** + * Precompiled SQL for state_set_signed() + */ + sqlite3_stmt *update_state_signed; + + /** + * Precompiled SQL for state_sync() + */ + sqlite3_stmt *insert_state_sync; + + /** + * Precompiled SQL for state_sync() + */ + sqlite3_stmt *delete_state; + + /** + * Precompiled SQL for state_sync() + */ + sqlite3_stmt *insert_state_from_sync; + + /** + * Precompiled SQL for state_sync() + */ + sqlite3_stmt *delete_state_sync; + + /** + * Precompiled SQL for state_get_signed() + */ + sqlite3_stmt *select_state_signed; + + /** + * Precompiled SQL for state_get() + */ + sqlite3_stmt *select_state_one; + + /** + * Precompiled SQL for state_get_prefix() + */ + sqlite3_stmt *select_state_prefix; + +}; + +#if DEBUG_PSYCSTORE + +static void +sql_trace (void *cls, const char *sql) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, "SQL query:\n%s\n", sql); +} + +#endif + +/** + * @brief Prepare a SQL statement + * + * @param dbh handle to the database + * @param sql SQL statement, UTF-8 encoded + * @param stmt set to the prepared statement + * @return 0 on success + */ +static int +sql_prepare (sqlite3 *dbh, const char *sql, sqlite3_stmt **stmt) +{ + char *tail; + int result; + + result = sqlite3_prepare_v2 (dbh, sql, strlen (sql), stmt, + (const char **) &tail); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Prepared `%s' / %p: %d\n", sql, *stmt, result); + if (result != SQLITE_OK) + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Error preparing SQL query: %s\n %s\n"), + sqlite3_errmsg (dbh), sql); + return result; +} + + +/** + * @brief Prepare a SQL statement + * + * @param dbh handle to the database + * @param sql SQL statement, UTF-8 encoded + * @return 0 on success + */ +static int +sql_exec (sqlite3 *dbh, const char *sql) +{ + int result; + + result = sqlite3_exec (dbh, sql, NULL, NULL, NULL); + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Executed `%s' / %d\n", sql, result); + if (result != SQLITE_OK) + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Error executing SQL query: %s\n %s\n"), + sqlite3_errmsg (dbh), sql); + return result; +} + + +/** + * Initialize the database connections and associated + * data structures (create tables and indices + * as needed as well). + * + * @param plugin the plugin context (state for this module) + * @return GNUNET_OK on success + */ +static int +database_setup (struct Plugin *plugin) +{ + char *filename; + + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_filename (plugin->cfg, "psycstore-sqlite", + "FILENAME", &filename)) + { + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + "psycstore-sqlite", "FILENAME"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != GNUNET_DISK_file_test (filename)) + { + if (GNUNET_OK != GNUNET_DISK_directory_create_for_file (filename)) + { + GNUNET_break (0); + GNUNET_free (filename); + return GNUNET_SYSERR; + } + } + /* filename should be UTF-8-encoded. If it isn't, it's a bug */ + plugin->fn = filename; + + /* Open database and precompile statements */ + if (SQLITE_OK != sqlite3_open (plugin->fn, &plugin->dbh)) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + _("Unable to initialize SQLite: %s.\n"), + sqlite3_errmsg (plugin->dbh)); + return GNUNET_SYSERR; + } + +#if DEBUG_PSYCSTORE + sqlite3_trace (plugin->dbh, &sql_trace, NULL); +#endif + + sql_exec (plugin->dbh, "PRAGMA temp_store=MEMORY"); + sql_exec (plugin->dbh, "PRAGMA synchronous=NORMAL"); + sql_exec (plugin->dbh, "PRAGMA legacy_file_format=OFF"); + sql_exec (plugin->dbh, "PRAGMA auto_vacuum=INCREMENTAL"); + sql_exec (plugin->dbh, "PRAGMA encoding=\"UTF-8\""); +#if ! DEBUG_PSYCSTORE + sql_exec (plugin->dbh, "PRAGMA locking_mode=EXCLUSIVE"); +#endif + sql_exec (plugin->dbh, "PRAGMA page_size=4096"); + + sqlite3_busy_timeout (plugin->dbh, BUSY_TIMEOUT_MS); + + /* Create tables */ + + sql_exec (plugin->dbh, + "CREATE TABLE IF NOT EXISTS channels (\n" + " id INTEGER PRIMARY KEY,\n" + " pub_key BLOB(32) UNIQUE,\n" + " max_state_message_id INTEGER,\n" // last applied state message ID + " state_hash_message_id INTEGER\n" // last message ID with a state hash + ");"); + + sql_exec (plugin->dbh, + "CREATE TABLE IF NOT EXISTS slaves (\n" + " id INTEGER PRIMARY KEY,\n" + " pub_key BLOB(32) UNIQUE\n" + ");"); + + sql_exec (plugin->dbh, + "CREATE TABLE IF NOT EXISTS membership (\n" + " channel_id INTEGER NOT NULL REFERENCES channels(id),\n" + " slave_id INTEGER NOT NULL REFERENCES slaves(id),\n" + " did_join INTEGER NOT NULL,\n" + " announced_at INTEGER NOT NULL,\n" + " effective_since INTEGER NOT NULL,\n" + " group_generation INTEGER NOT NULL\n" + ");"); + sql_exec (plugin->dbh, + "CREATE INDEX IF NOT EXISTS idx_membership_channel_id_slave_id " + "ON membership (channel_id, slave_id);"); + + /** @todo messages table: add method_name column */ + sql_exec (plugin->dbh, + "CREATE TABLE IF NOT EXISTS messages (\n" + " channel_id INTEGER NOT NULL REFERENCES channels(id),\n" + " hop_counter INTEGER NOT NULL,\n" + " signature BLOB,\n" + " purpose BLOB,\n" + " fragment_id INTEGER NOT NULL,\n" + " fragment_offset INTEGER NOT NULL,\n" + " message_id INTEGER NOT NULL,\n" + " group_generation INTEGER NOT NULL,\n" + " multicast_flags INTEGER NOT NULL,\n" + " psycstore_flags INTEGER NOT NULL,\n" + " data BLOB,\n" + " PRIMARY KEY (channel_id, fragment_id),\n" + " UNIQUE (channel_id, message_id, fragment_offset)\n" + ");"); + + sql_exec (plugin->dbh, + "CREATE TABLE IF NOT EXISTS state (\n" + " channel_id INTEGER NOT NULL REFERENCES channels(id),\n" + " name TEXT NOT NULL,\n" + " value_current BLOB,\n" + " value_signed BLOB,\n" + " PRIMARY KEY (channel_id, name)\n" + ");"); + + sql_exec (plugin->dbh, + "CREATE TABLE IF NOT EXISTS state_sync (\n" + " channel_id INTEGER NOT NULL REFERENCES channels(id),\n" + " name TEXT NOT NULL,\n" + " value BLOB,\n" + " PRIMARY KEY (channel_id, name)\n" + ");"); + + /* Prepare statements */ + + sql_prepare (plugin->dbh, "BEGIN;", &plugin->transaction_begin); + + sql_prepare (plugin->dbh, "COMMIT;", &plugin->transaction_commit); + + sql_prepare (plugin->dbh, "ROLLBACK;", &plugin->transaction_rollback); + + sql_prepare (plugin->dbh, + "INSERT OR IGNORE INTO channels (pub_key) VALUES (?);", + &plugin->insert_channel_key); + + sql_prepare (plugin->dbh, + "INSERT OR IGNORE INTO slaves (pub_key) VALUES (?);", + &plugin->insert_slave_key); + + sql_prepare (plugin->dbh, + "INSERT INTO membership\n" + " (channel_id, slave_id, did_join, announced_at,\n" + " effective_since, group_generation)\n" + "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n" + " (SELECT id FROM slaves WHERE pub_key = ?),\n" + " ?, ?, ?, ?);", + &plugin->insert_membership); + + sql_prepare (plugin->dbh, + "SELECT did_join FROM membership\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND slave_id = (SELECT id FROM slaves WHERE pub_key = ?)\n" + " AND effective_since <= ? AND did_join = 1\n" + "ORDER BY announced_at DESC LIMIT 1;", + &plugin->select_membership); + + sql_prepare (plugin->dbh, + "INSERT OR IGNORE INTO messages\n" + " (channel_id, hop_counter, signature, purpose,\n" + " fragment_id, fragment_offset, message_id,\n" + " group_generation, multicast_flags, psycstore_flags, data)\n" + "VALUES ((SELECT id FROM channels WHERE pub_key = ?),\n" + " ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);", + &plugin->insert_fragment); + + sql_prepare (plugin->dbh, + "UPDATE messages\n" + "SET psycstore_flags = psycstore_flags | ?\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND message_id = ? AND fragment_offset = 0;", + &plugin->update_message_flags); + + sql_prepare (plugin->dbh, + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND ? <= fragment_id AND fragment_id <= ?;", + &plugin->select_fragments); + + /** @todo select_messages: add method_prefix filter */ + sql_prepare (plugin->dbh, + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND ? <= message_id AND message_id <= ?" + "LIMIT ?;", + &plugin->select_messages); + + sql_prepare (plugin->dbh, + "SELECT * FROM\n" + "(SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + " FROM messages\n" + " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " ORDER BY fragment_id DESC\n" + " LIMIT ?)\n" + "ORDER BY fragment_id;", + &plugin->select_latest_fragments); + + /** @todo select_latest_messages: add method_prefix filter */ + sql_prepare (plugin->dbh, + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND message_id IN\n" + " (SELECT message_id\n" + " FROM messages\n" + " WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " GROUP BY message_id\n" + " ORDER BY message_id\n" + " DESC LIMIT ?)\n" + "ORDER BY fragment_id;", + &plugin->select_latest_messages); + + sql_prepare (plugin->dbh, + "SELECT hop_counter, signature, purpose, fragment_id,\n" + " fragment_offset, message_id, group_generation,\n" + " multicast_flags, psycstore_flags, data\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND message_id = ? AND fragment_offset = ?;", + &plugin->select_message_fragment); + + sql_prepare (plugin->dbh, + "SELECT fragment_id, message_id, group_generation\n" + "FROM messages\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + "ORDER BY fragment_id DESC LIMIT 1;", + &plugin->select_counters_message); + + sql_prepare (plugin->dbh, + "SELECT max_state_message_id\n" + "FROM channels\n" + "WHERE pub_key = ? AND max_state_message_id IS NOT NULL;", + &plugin->select_counters_state); + + sql_prepare (plugin->dbh, + "UPDATE channels\n" + "SET max_state_message_id = ?\n" + "WHERE pub_key = ?;", + &plugin->update_max_state_message_id); + + sql_prepare (plugin->dbh, + "UPDATE channels\n" + "SET state_hash_message_id = ?\n" + "WHERE pub_key = ?;", + &plugin->update_state_hash_message_id); + + sql_prepare (plugin->dbh, + "INSERT OR REPLACE INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT new.channel_id, new.name,\n" + " new.value_current, old.value_signed\n" + "FROM (SELECT (SELECT id FROM channels WHERE pub_key = ?)\n" + " AS channel_id,\n" + " ? AS name, ? AS value_current) AS new\n" + "LEFT JOIN (SELECT channel_id, name, value_signed\n" + " FROM state) AS old\n" + "ON new.channel_id = old.channel_id AND new.name = old.name;", + &plugin->insert_state_current); + + sql_prepare (plugin->dbh, + "DELETE FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND (value_current IS NULL OR length(value_current) = 0)\n" + " AND (value_signed IS NULL OR length(value_signed) = 0);", + &plugin->delete_state_empty); + + sql_prepare (plugin->dbh, + "UPDATE state\n" + "SET value_signed = value_current\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->update_state_signed); + + sql_prepare (plugin->dbh, + "DELETE FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->delete_state); + + sql_prepare (plugin->dbh, + "INSERT INTO state_sync (channel_id, name, value)\n" + "VALUES ((SELECT id FROM channels WHERE pub_key = ?), ?, ?);", + &plugin->insert_state_sync); + + sql_prepare (plugin->dbh, + "INSERT INTO state\n" + " (channel_id, name, value_current, value_signed)\n" + "SELECT channel_id, name, value, value\n" + "FROM state_sync\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->insert_state_from_sync); + + sql_prepare (plugin->dbh, + "DELETE FROM state_sync\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?);", + &plugin->delete_state_sync); + + sql_prepare (plugin->dbh, + "SELECT value_current\n" + "FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND name = ?;", + &plugin->select_state_one); + + sql_prepare (plugin->dbh, + "SELECT name, value_current\n" + "FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)\n" + " AND (name = ? OR substr(name, 1, ?) = ?);", + &plugin->select_state_prefix); + + sql_prepare (plugin->dbh, + "SELECT name, value_signed\n" + "FROM state\n" + "WHERE channel_id = (SELECT id FROM channels WHERE pub_key = ?)" + " AND value_signed IS NOT NULL;", + &plugin->select_state_signed); + + return GNUNET_OK; +} + + +/** + * Shutdown database connection and associate data + * structures. + * @param plugin the plugin context (state for this module) + */ +static void +database_shutdown (struct Plugin *plugin) +{ + int result; + sqlite3_stmt *stmt; + while (NULL != (stmt = sqlite3_next_stmt (plugin->dbh, NULL))) + { + result = sqlite3_finalize (stmt); + if (SQLITE_OK != result) + LOG (GNUNET_ERROR_TYPE_WARNING, + "Failed to close statement %p: %d\n", stmt, result); + } + if (SQLITE_OK != sqlite3_close (plugin->dbh)) + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR, "sqlite3_close"); + + GNUNET_free_non_null (plugin->fn); +} + +/** + * Execute a prepared statement with a @a channel_key argument. + * + * @param plugin Plugin handle. + * @param stmt Statement to execute. + * @param channel_key Public key of the channel. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +exec_channel (struct Plugin *plugin, sqlite3_stmt *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + +/** + * Begin a transaction. + */ +static int +transaction_begin (struct Plugin *plugin, enum Transactions transaction) +{ + sqlite3_stmt *stmt = plugin->transaction_begin; + + if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + plugin->transaction = transaction; + return GNUNET_OK; +} + + +/** + * Commit current transaction. + */ +static int +transaction_commit (struct Plugin *plugin) +{ + sqlite3_stmt *stmt = plugin->transaction_commit; + + if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + plugin->transaction = TRANSACTION_NONE; + return GNUNET_OK; +} + + +/** + * Roll back current transaction. + */ +static int +transaction_rollback (struct Plugin *plugin) +{ + sqlite3_stmt *stmt = plugin->transaction_rollback; + + if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + plugin->transaction = TRANSACTION_NONE; + return GNUNET_OK; +} + + +static int +channel_key_store (struct Plugin *plugin, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + sqlite3_stmt *stmt = plugin->insert_channel_key; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +static int +slave_key_store (struct Plugin *plugin, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key) +{ + sqlite3_stmt *stmt = plugin->insert_slave_key; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, slave_key, + sizeof (*slave_key), SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +/** + * Store join/leave events for a PSYC channel in order to be able to answer + * membership test queries later. + * + * @see GNUNET_PSYCSTORE_membership_store() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +sqlite_membership_store (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + int did_join, + uint64_t announced_at, + uint64_t effective_since, + uint64_t group_generation) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->insert_membership; + + GNUNET_assert (TRANSACTION_NONE == plugin->transaction); + + if (announced_at > INT64_MAX || + effective_since > INT64_MAX || + group_generation > INT64_MAX) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + if (GNUNET_OK != channel_key_store (plugin, channel_key) + || GNUNET_OK != slave_key_store (plugin, slave_key)) + return GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key, + sizeof (*slave_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int (stmt, 3, did_join) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, announced_at) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, effective_since) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, group_generation)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + +/** + * Test if a member was admitted to the channel at the given message ID. + * + * @see GNUNET_PSYCSTORE_membership_test() + * + * @return #GNUNET_YES if the member was admitted, #GNUNET_NO if not, + * #GNUNET_SYSERR if there was en error. + */ +static int +membership_test (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_id) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_membership; + int ret = GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_blob (stmt, 2, slave_key, + sizeof (*slave_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + switch (sqlite3_step (stmt)) + { + case SQLITE_DONE: + ret = GNUNET_NO; + break; + case SQLITE_ROW: + ret = GNUNET_YES; + } + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + +/** + * Store a message fragment sent to a channel. + * + * @see GNUNET_PSYCSTORE_fragment_store() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_store (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_MULTICAST_MessageHeader *msg, + uint32_t psycstore_flags) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->insert_fragment; + + GNUNET_assert (TRANSACTION_NONE == plugin->transaction); + + uint64_t fragment_id = GNUNET_ntohll (msg->fragment_id); + uint64_t fragment_offset = GNUNET_ntohll (msg->fragment_offset); + uint64_t message_id = GNUNET_ntohll (msg->message_id); + uint64_t group_generation = GNUNET_ntohll (msg->group_generation); + + if (fragment_id > INT64_MAX || fragment_offset > INT64_MAX || + message_id > INT64_MAX || group_generation > INT64_MAX) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "Tried to store fragment with a field > INT64_MAX: " + "%lu, %lu, %lu, %lu\n", fragment_id, fragment_offset, + message_id, group_generation); + GNUNET_break (0); + return GNUNET_SYSERR; + } + + if (GNUNET_OK != channel_key_store (plugin, channel_key)) + return GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, ntohl (msg->hop_counter) ) + || SQLITE_OK != sqlite3_bind_blob (stmt, 3, (const void *) &msg->signature, + sizeof (msg->signature), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_blob (stmt, 4, (const void *) &msg->purpose, + sizeof (msg->purpose), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 5, fragment_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 6, fragment_offset) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 7, message_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 8, group_generation) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 9, ntohl (msg->flags)) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 10, psycstore_flags) + || SQLITE_OK != sqlite3_bind_blob (stmt, 11, (const void *) &msg[1], + ntohs (msg->header.size) + - sizeof (*msg), SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + +/** + * Set additional flags for a given message. + * + * They are OR'd with any existing flags set. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_add_flags (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + uint32_t psycstore_flags) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->update_message_flags; + int ret = GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, psycstore_flags) + || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key, + sizeof (*channel_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_id)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + switch (sqlite3_step (stmt)) + { + case SQLITE_DONE: + ret = sqlite3_total_changes (plugin->dbh) > 0 ? GNUNET_OK : GNUNET_NO; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + return ret; +} + +static int +fragment_row (sqlite3_stmt *stmt, GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + int data_size = sqlite3_column_bytes (stmt, 9); + struct GNUNET_MULTICAST_MessageHeader *msg + = GNUNET_malloc (sizeof (*msg) + data_size); + + msg->header.size = htons (sizeof (*msg) + data_size); + msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + msg->hop_counter = htonl ((uint32_t) sqlite3_column_int64 (stmt, 0)); + GNUNET_memcpy (&msg->signature, + sqlite3_column_blob (stmt, 1), + sqlite3_column_bytes (stmt, 1)); + GNUNET_memcpy (&msg->purpose, + sqlite3_column_blob (stmt, 2), + sqlite3_column_bytes (stmt, 2)); + msg->fragment_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 3)); + msg->fragment_offset = GNUNET_htonll (sqlite3_column_int64 (stmt, 4)); + msg->message_id = GNUNET_htonll (sqlite3_column_int64 (stmt, 5)); + msg->group_generation = GNUNET_htonll (sqlite3_column_int64 (stmt, 6)); + msg->flags = htonl (sqlite3_column_int64 (stmt, 7)); + GNUNET_memcpy (&msg[1], sqlite3_column_blob (stmt, 9), data_size); + + return cb (cb_cls, (void *) msg, sqlite3_column_int64 (stmt, 8)); +} + + +static int +fragment_select (struct Plugin *plugin, sqlite3_stmt *stmt, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, void *cb_cls) +{ + int ret = GNUNET_SYSERR; + int sql_ret; + + do + { + sql_ret = sqlite3_step (stmt); + switch (sql_ret) + { + case SQLITE_DONE: + if (ret != GNUNET_OK) + ret = GNUNET_NO; + break; + case SQLITE_ROW: + ret = fragment_row (stmt, cb, cb_cls); + (*returned_fragments)++; + if (ret != GNUNET_YES) + sql_ret = SQLITE_DONE; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + while (sql_ret == SQLITE_ROW); + + return ret; +} + +/** + * Retrieve a message fragment range by fragment ID. + * + * @see GNUNET_PSYCSTORE_fragment_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_fragments; + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_fragment_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_fragment_id)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Retrieve a message fragment range by fragment ID. + * + * @see GNUNET_PSYCSTORE_fragment_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +fragment_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_latest_fragments; + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, fragment_limit)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Retrieve all fragments of a message ID range. + * + * @see GNUNET_PSYCSTORE_message_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t first_message_id, + uint64_t last_message_id, + uint64_t fragment_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_messages; + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, first_message_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, last_message_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 4, + (0 != fragment_limit) + ? fragment_limit + : INT64_MAX)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Retrieve all fragments of the latest messages. + * + * @see GNUNET_PSYCSTORE_message_get_latest() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get_latest (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_limit, + uint64_t *returned_fragments, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_latest_messages; + int ret = GNUNET_SYSERR; + *returned_fragments = 0; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, message_limit)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + ret = fragment_select (plugin, stmt, returned_fragments, cb, cb_cls); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Retrieve a fragment of message specified by its message ID and fragment + * offset. + * + * @see GNUNET_PSYCSTORE_message_get_fragment() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +message_get_fragment (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + uint64_t fragment_offset, + GNUNET_PSYCSTORE_FragmentCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_message_fragment; + int ret = GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 2, message_id) + || SQLITE_OK != sqlite3_bind_int64 (stmt, 3, fragment_offset)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + switch (sqlite3_step (stmt)) + { + case SQLITE_DONE: + ret = GNUNET_NO; + break; + case SQLITE_ROW: + ret = fragment_row (stmt, cb, cb_cls); + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + +/** + * Retrieve the max. values of message counters for a channel. + * + * @see GNUNET_PSYCSTORE_counters_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +counters_message_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t *max_fragment_id, + uint64_t *max_message_id, + uint64_t *max_group_generation) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_counters_message; + int ret = GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + switch (sqlite3_step (stmt)) + { + case SQLITE_DONE: + ret = GNUNET_NO; + break; + case SQLITE_ROW: + *max_fragment_id = sqlite3_column_int64 (stmt, 0); + *max_message_id = sqlite3_column_int64 (stmt, 1); + *max_group_generation = sqlite3_column_int64 (stmt, 2); + ret = GNUNET_OK; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + +/** + * Retrieve the max. values of state counters for a channel. + * + * @see GNUNET_PSYCSTORE_counters_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +counters_state_get (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t *max_state_message_id) +{ + struct Plugin *plugin = cls; + sqlite3_stmt *stmt = plugin->select_counters_state; + int ret = GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + switch (sqlite3_step (stmt)) + { + case SQLITE_DONE: + ret = GNUNET_NO; + break; + case SQLITE_ROW: + *max_state_message_id = sqlite3_column_int64 (stmt, 0); + ret = GNUNET_OK; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Assign a value to a state variable. + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_assign (struct Plugin *plugin, sqlite3_stmt *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, const void *value, size_t value_size) +{ + int ret = GNUNET_SYSERR; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_blob (stmt, 3, value, value_size, + SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + switch (sqlite3_step (stmt)) + { + case SQLITE_DONE: + ret = 0 < sqlite3_total_changes (plugin->dbh) ? GNUNET_OK : GNUNET_NO; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + + return ret; +} + + +static int +update_message_id (struct Plugin *plugin, sqlite3_stmt *stmt, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id) +{ + if (SQLITE_OK != sqlite3_bind_int64 (stmt, 1, message_id) + || SQLITE_OK != sqlite3_bind_blob (stmt, 2, channel_key, + sizeof (*channel_key), SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else if (SQLITE_DONE != sqlite3_step (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Begin modifying current state. + */ +static int +state_modify_begin (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, uint64_t state_delta) +{ + struct Plugin *plugin = cls; + + if (state_delta > 0) + { + /** + * We can only apply state modifiers in the current message if modifiers in + * the previous stateful message (message_id - state_delta) were already + * applied. + */ + + uint64_t max_state_message_id = 0; + int ret = counters_state_get (plugin, channel_key, &max_state_message_id); + switch (ret) + { + case GNUNET_OK: + case GNUNET_NO: // no state yet + ret = GNUNET_OK; + break; + default: + return ret; + } + + if (max_state_message_id < message_id - state_delta) + return GNUNET_NO; /* some stateful messages not yet applied */ + else if (message_id - state_delta < max_state_message_id) + return GNUNET_NO; /* changes already applied */ + } + + if (TRANSACTION_NONE != plugin->transaction) + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } + return transaction_begin (plugin, TRANSACTION_STATE_MODIFY); +} + + +/** + * Set the current value of state variable. + * + * @see GNUNET_PSYCSTORE_state_modify() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_modify_op (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + enum GNUNET_PSYC_Operator op, + const char *name, const void *value, size_t value_size) +{ + struct Plugin *plugin = cls; + GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); + + switch (op) + { + case GNUNET_PSYC_OP_ASSIGN: + return state_assign (plugin, plugin->insert_state_current, channel_key, + name, value, value_size); + + default: /** @todo implement more state operations */ + GNUNET_break (0); + return GNUNET_SYSERR; + } +} + + +/** + * End modifying current state. + */ +static int +state_modify_end (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id) +{ + struct Plugin *plugin = cls; + GNUNET_assert (TRANSACTION_STATE_MODIFY == plugin->transaction); + + return + GNUNET_OK == exec_channel (plugin, plugin->delete_state_empty, channel_key) + && GNUNET_OK == update_message_id (plugin, + plugin->update_max_state_message_id, + channel_key, message_id) + && GNUNET_OK == transaction_commit (plugin) + ? GNUNET_OK : GNUNET_SYSERR; +} + + +/** + * Begin state synchronization. + */ +static int +state_sync_begin (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, plugin->delete_state_sync, channel_key); +} + + +/** + * Assign current value of a state variable. + * + * @see GNUNET_PSYCSTORE_state_modify() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_sync_assign (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, const void *value, size_t value_size) +{ + struct Plugin *plugin = cls; + return state_assign (cls, plugin->insert_state_sync, channel_key, + name, value, value_size); +} + + +/** + * End modifying current state. + */ +static int +state_sync_end (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t max_state_message_id, + uint64_t state_hash_message_id) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + + if (TRANSACTION_NONE != plugin->transaction) + { + /** @todo FIXME: wait for other transaction to finish */ + return GNUNET_SYSERR; + } + + GNUNET_OK == transaction_begin (plugin, TRANSACTION_STATE_SYNC) + && GNUNET_OK == exec_channel (plugin, plugin->delete_state, channel_key) + && GNUNET_OK == exec_channel (plugin, plugin->insert_state_from_sync, + channel_key) + && GNUNET_OK == exec_channel (plugin, plugin->delete_state_sync, + channel_key) + && GNUNET_OK == update_message_id (plugin, + plugin->update_state_hash_message_id, + channel_key, state_hash_message_id) + && GNUNET_OK == update_message_id (plugin, + plugin->update_max_state_message_id, + channel_key, max_state_message_id) + && GNUNET_OK == transaction_commit (plugin) + ? ret = GNUNET_OK + : transaction_rollback (plugin); + return ret; +} + + +/** + * Delete the whole state. + * + * @see GNUNET_PSYCSTORE_state_reset() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_reset (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, plugin->delete_state, channel_key); +} + + +/** + * Update signed values of state variables in the state store. + * + * @see GNUNET_PSYCSTORE_state_hash_update() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_update_signed (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key) +{ + struct Plugin *plugin = cls; + return exec_channel (plugin, plugin->update_state_signed, channel_key); +} + + +/** + * Retrieve a state variable by name. + * + * @see GNUNET_PSYCSTORE_state_get() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + + sqlite3_stmt *stmt = plugin->select_state_one; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), + SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, -1, SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + switch (sqlite3_step (stmt)) + { + case SQLITE_DONE: + ret = GNUNET_NO; + break; + case SQLITE_ROW: + ret = cb (cb_cls, name, sqlite3_column_blob (stmt, 0), + sqlite3_column_bytes (stmt, 0)); + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Retrieve all state variables for a channel with the given prefix. + * + * @see GNUNET_PSYCSTORE_state_get_prefix() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get_prefix (void *cls, const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, GNUNET_PSYCSTORE_StateCallback cb, + void *cb_cls) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + sqlite3_stmt *stmt = plugin->select_state_prefix; + size_t name_len = strlen (name); + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_text (stmt, 2, name, name_len, SQLITE_STATIC) + || SQLITE_OK != sqlite3_bind_int (stmt, 3, name_len) + || SQLITE_OK != sqlite3_bind_text (stmt, 4, name, name_len, SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + int sql_ret; + do + { + sql_ret = sqlite3_step (stmt); + switch (sql_ret) + { + case SQLITE_DONE: + if (ret != GNUNET_OK) + ret = GNUNET_NO; + break; + case SQLITE_ROW: + ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0), + sqlite3_column_blob (stmt, 1), + sqlite3_column_bytes (stmt, 1)); + if (ret != GNUNET_YES) + sql_ret = SQLITE_DONE; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + while (sql_ret == SQLITE_ROW); + } + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + return ret; +} + + +/** + * Retrieve all signed state variables for a channel. + * + * @see GNUNET_PSYCSTORE_state_get_signed() + * + * @return #GNUNET_OK on success, else #GNUNET_SYSERR + */ +static int +state_get_signed (void *cls, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + GNUNET_PSYCSTORE_StateCallback cb, void *cb_cls) +{ + struct Plugin *plugin = cls; + int ret = GNUNET_SYSERR; + + sqlite3_stmt *stmt = plugin->select_state_signed; + + if (SQLITE_OK != sqlite3_bind_blob (stmt, 1, channel_key, + sizeof (*channel_key), SQLITE_STATIC)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_bind"); + } + else + { + int sql_ret; + do + { + sql_ret = sqlite3_step (stmt); + switch (sql_ret) + { + case SQLITE_DONE: + if (ret != GNUNET_OK) + ret = GNUNET_NO; + break; + case SQLITE_ROW: + ret = cb (cb_cls, (const char *) sqlite3_column_text (stmt, 0), + sqlite3_column_blob (stmt, 1), + sqlite3_column_bytes (stmt, 1)); + if (ret != GNUNET_YES) + sql_ret = SQLITE_DONE; + break; + default: + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_step"); + } + } + while (sql_ret == SQLITE_ROW); + } + + if (SQLITE_OK != sqlite3_reset (stmt)) + { + LOG_SQLITE (plugin, GNUNET_ERROR_TYPE_ERROR | GNUNET_ERROR_TYPE_BULK, + "sqlite3_reset"); + } + + return ret; +} + + +/** + * Entry point for the plugin. + * + * @param cls The struct GNUNET_CONFIGURATION_Handle. + * @return NULL on error, otherwise the plugin context + */ +void * +libgnunet_plugin_psycstore_sqlite_init (void *cls) +{ + static struct Plugin plugin; + const struct GNUNET_CONFIGURATION_Handle *cfg = cls; + struct GNUNET_PSYCSTORE_PluginFunctions *api; + + if (NULL != plugin.cfg) + return NULL; /* can only initialize once! */ + memset (&plugin, 0, sizeof (struct Plugin)); + plugin.cfg = cfg; + if (GNUNET_OK != database_setup (&plugin)) + { + database_shutdown (&plugin); + return NULL; + } + api = GNUNET_new (struct GNUNET_PSYCSTORE_PluginFunctions); + api->cls = &plugin; + api->membership_store = &sqlite_membership_store; + api->membership_test = &membership_test; + api->fragment_store = &fragment_store; + api->message_add_flags = &message_add_flags; + api->fragment_get = &fragment_get; + api->fragment_get_latest = &fragment_get_latest; + api->message_get = &message_get; + api->message_get_latest = &message_get_latest; + api->message_get_fragment = &message_get_fragment; + api->counters_message_get = &counters_message_get; + api->counters_state_get = &counters_state_get; + api->state_modify_begin = &state_modify_begin; + api->state_modify_op = &state_modify_op; + api->state_modify_end = &state_modify_end; + api->state_sync_begin = &state_sync_begin; + api->state_sync_assign = &state_sync_assign; + api->state_sync_end = &state_sync_end; + api->state_reset = &state_reset; + api->state_update_signed = &state_update_signed; + api->state_get = &state_get; + api->state_get_prefix = &state_get_prefix; + api->state_get_signed = &state_get_signed; + + LOG (GNUNET_ERROR_TYPE_INFO, _("SQLite database running\n")); + return api; +} + + +/** + * Exit point from the plugin. + * + * @param cls The plugin context (as returned by "init") + * @return Always NULL + */ +void * +libgnunet_plugin_psycstore_sqlite_done (void *cls) +{ + struct GNUNET_PSYCSTORE_PluginFunctions *api = cls; + struct Plugin *plugin = api->cls; + + database_shutdown (plugin); + plugin->cfg = NULL; + GNUNET_free (api); + LOG (GNUNET_ERROR_TYPE_DEBUG, "SQLite plugin is finished\n"); + return NULL; +} + +/* end of plugin_psycstore_sqlite.c */ diff --git a/src/psycstore/psycstore.conf.in b/src/psycstore/psycstore.conf.in new file mode 100644 index 0000000..3905db1 --- /dev/null +++ b/src/psycstore/psycstore.conf.in @@ -0,0 +1,28 @@ +[psycstore] +START_ON_DEMAND = @START_ON_DEMAND@ +BINARY = gnunet-service-psycstore + +UNIXPATH = $GNUNET_RUNTIME_DIR/gnunet-service-psycstore.sock +UNIX_MATCH_UID = YES +UNIX_MATCH_GID = YES + +@UNIXONLY@PORT = 2111 +HOSTNAME = localhost +ACCEPT_FROM = 127.0.0.1; +ACCEPT_FROM6 = ::1; + +DATABASE = sqlite + +[psycstore-sqlite] +FILENAME = $GNUNET_DATA_HOME/psycstore/sqlite.db + +[psycstore-mysql] +DATABASE = gnunet +CONFIG = ~/.my.cnf +# USER = gnunet +# PASSWORD = +# HOST = localhost +# PORT = 3306 + +[psycstore-postgres] +CONFIG = connect_timeout=10; dbname=gnunet diff --git a/src/psycstore/psycstore.h b/src/psycstore/psycstore.h new file mode 100644 index 0000000..9a1c06a --- /dev/null +++ b/src/psycstore/psycstore.h @@ -0,0 +1,520 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file psycstore/psycstore.h + * @brief Common type definitions for the PSYCstore service and API. + * @author Gabor X Toth + */ + +#ifndef GNUNET_PSYCSTORE_H +#define GNUNET_PSYCSTORE_H + +#include "gnunet_common.h" + + +GNUNET_NETWORK_STRUCT_BEGIN + +/** + * Answer from service to client about last operation. + */ +struct OperationResult +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /**lowed by + * Status code for the operation. + */ + uint64_t result_code GNUNET_PACKED; + + /* followed by 0-terminated error message (on error) */ + +}; + + +/** + * Answer from service to client about master counters. + * + * @see GNUNET_PSYCSTORE_counters_get() + */ +struct CountersResult +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS + */ + struct GNUNET_MessageHeader header; + + /** + * Status code for the operation: + * #GNUNET_OK: success, counter values are returned. + * #GNUNET_NO: no message has been sent to the channel yet. + * #GNUNET_SYSERR: an error occurred. + */ + uint32_t result_code GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + uint64_t max_fragment_id GNUNET_PACKED; + + uint64_t max_message_id GNUNET_PACKED; + + uint64_t max_group_generation GNUNET_PACKED; + + uint64_t max_state_message_id GNUNET_PACKED; +}; + + +/** + * Answer from service to client containing a message fragment. + */ +struct FragmentResult +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE + */ + struct GNUNET_MessageHeader header; + + uint32_t psycstore_flags GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /* Followed by GNUNET_MULTICAST_MessageHeader */ +}; + + +/** + * Answer from service to client containing a state variable. + */ +struct StateResult +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE + */ + struct GNUNET_MessageHeader header; + + uint16_t name_size GNUNET_PACKED; + + uint16_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /* Followed by name and value */ +}; + + +/** + * Generic operation request. + */ +struct OperationRequest +{ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; +}; + + +/** + * @see GNUNET_PSYCSTORE_membership_store() + */ +struct MembershipStoreRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /** + * Slave's public key. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + + uint64_t announced_at GNUNET_PACKED; + uint64_t effective_since GNUNET_PACKED; + uint64_t group_generation GNUNET_PACKED; + uint8_t did_join; +}; + + +/** + * @see GNUNET_PSYCSTORE_membership_test() + */ +struct MembershipTestRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /** + * Slave's public key. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + + uint64_t message_id GNUNET_PACKED; + + uint64_t group_generation GNUNET_PACKED; +}; + + +/** + * @see GNUNET_PSYCSTORE_fragment_store() + */ +struct FragmentStoreRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE + */ + struct GNUNET_MessageHeader header; + + /** + * enum GNUNET_PSYCSTORE_MessageFlags + */ + uint32_t psycstore_flags GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /** + * Operation ID. + */ + uint64_t op_id; + + /* Followed by fragment */ +}; + + +/** + * @see GNUNET_PSYCSTORE_fragment_get() + */ +struct FragmentGetRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /** + * Slave's public key. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + + /** + * First fragment ID to request. + */ + uint64_t first_fragment_id GNUNET_PACKED; + + /** + * Last fragment ID to request. + */ + uint64_t last_fragment_id GNUNET_PACKED; + + /** + * Maximum number of fragments to retrieve. + */ + uint64_t fragment_limit GNUNET_PACKED; + + /** + * Do membership test with @a slave_key before returning fragment? + * #GNUNET_YES or #GNUNET_NO + */ + uint8_t do_membership_test; +}; + + +/** + * @see GNUNET_PSYCSTORE_message_get() + */ +struct MessageGetRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /** + * Slave's public key. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + + /** + * First message ID to request. + */ + uint64_t first_message_id GNUNET_PACKED; + + /** + * Last message ID to request. + */ + uint64_t last_message_id GNUNET_PACKED; + + /** + * Maximum number of messages to retrieve. + */ + uint64_t message_limit GNUNET_PACKED; + + /** + * Maximum number of fragments to retrieve. + */ + uint64_t fragment_limit GNUNET_PACKED; + + /** + * Do membership test with @a slave_key before returning fragment? + * #GNUNET_YES or #GNUNET_NO + */ + uint8_t do_membership_test; + + /* Followed by method_prefix */ +}; + + +/** + * @see GNUNET_PSYCSTORE_message_get_fragment() + */ +struct MessageGetFragmentRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_FRAGMENT_GET + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /** + * Slave's public key. + */ + struct GNUNET_CRYPTO_EcdsaPublicKey slave_key; + + /** + * Requested message ID. + */ + uint64_t message_id GNUNET_PACKED; + + /** + * Requested fragment offset. + */ + uint64_t fragment_offset GNUNET_PACKED; + + /** + * Do membership test with @a slave_key before returning fragment? + * #GNUNET_YES or #GNUNET_NO + */ + uint8_t do_membership_test; +}; + + +/** + * @see GNUNET_PSYCSTORE_state_hash_update() + */ +struct StateHashUpdateRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE + */ + struct GNUNET_MessageHeader header; + + uint32_t reserved GNUNET_PACKED; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + struct GNUNET_HashCode hash; +}; + + +enum StateOpFlags +{ + STATE_OP_FIRST = 1 << 0, + STATE_OP_LAST = 1 << 1 +}; + + +/** + * @see GNUNET_PSYCSTORE_state_modify() + */ +struct StateModifyRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY + */ + struct GNUNET_MessageHeader header; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * ID of the message to apply the state changes in. + */ + uint64_t message_id GNUNET_PACKED; + + /** + * State delta of the message with ID @a message_id. + */ + uint64_t state_delta GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; +}; + + +/** + * @see GNUNET_PSYCSTORE_state_sync() + */ +struct StateSyncRequest +{ + /** + * Type: GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC + */ + struct GNUNET_MessageHeader header; + + /** + * Size of name, including NUL terminator. + */ + uint16_t name_size GNUNET_PACKED; + + /** + * OR'd StateOpFlags + */ + uint8_t flags; + + uint8_t reserved; + + /** + * Operation ID. + */ + uint64_t op_id GNUNET_PACKED; + + /** + * ID of the message that contains the state_hash PSYC header variable. + */ + uint64_t state_hash_message_id GNUNET_PACKED; + + /** + * ID of the last stateful message before @a state_hash_message_id. + */ + uint64_t max_state_message_id GNUNET_PACKED; + + /** + * Channel's public key. + */ + struct GNUNET_CRYPTO_EddsaPublicKey channel_key; + + /* Followed by NUL-terminated name, then the value. */ +}; + + +GNUNET_NETWORK_STRUCT_END + +#endif diff --git a/src/psycstore/psycstore_api.c b/src/psycstore/psycstore_api.c new file mode 100644 index 0000000..ab4cd0f --- /dev/null +++ b/src/psycstore/psycstore_api.c @@ -0,0 +1,1285 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file psycstore/psycstore_api.c + * @brief API to interact with the PSYCstore service + * @author Gabor X Toth + * @author Christian Grothoff + */ + +#include <inttypes.h> + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_constants.h" +#include "gnunet_protocols.h" +#include "gnunet_psycstore_service.h" +#include "gnunet_multicast_service.h" +#include "psycstore.h" + +#define LOG(kind,...) GNUNET_log_from (kind, "psycstore-api",__VA_ARGS__) + +/** + * Handle for an operation with the PSYCstore service. + */ +struct GNUNET_PSYCSTORE_OperationHandle +{ + + /** + * Main PSYCstore handle. + */ + struct GNUNET_PSYCSTORE_Handle *h; + + /** + * Data callbacks. + */ + union { + GNUNET_PSYCSTORE_FragmentCallback fragment_cb; + GNUNET_PSYCSTORE_CountersCallback counters_cb; + GNUNET_PSYCSTORE_StateCallback state_cb; + }; + + /** + * Closure for callbacks. + */ + void *cls; + + /** + * Message envelope. + */ + struct GNUNET_MQ_Envelope *env; + + /** + * Operation ID. + */ + uint64_t op_id; +}; + + +/** + * Handle for the service. + */ +struct GNUNET_PSYCSTORE_Handle +{ + /** + * Configuration to use. + */ + const struct GNUNET_CONFIGURATION_Handle *cfg; + + /** + * Client connection. + */ + struct GNUNET_MQ_Handle *mq; + + /** + * Async operations. + */ + struct GNUNET_OP_Handle *op; + + /** + * Task doing exponential back-off trying to reconnect. + */ + struct GNUNET_SCHEDULER_Task *reconnect_task; + + /** + * Delay for next connect retry. + */ + struct GNUNET_TIME_Relative reconnect_delay; + + + GNUNET_PSYCSTORE_FragmentCallback *fragment_cb; + + GNUNET_PSYCSTORE_CountersCallback *counters_cb; + + GNUNET_PSYCSTORE_StateCallback *state_cb; + /** + * Closure for callbacks. + */ + void *cb_cls; +}; + + +static int +check_result_code (void *cls, const struct OperationResult *opres) +{ + uint16_t size = ntohs (opres->header.size); + const char *str = (const char *) &opres[1]; + if ( (sizeof (*opres) < size) && + ('\0' != str[size - sizeof (*opres) - 1]) ) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + return GNUNET_OK; +} + + +static void +handle_result_code (void *cls, const struct OperationResult *opres) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + uint16_t size = ntohs (opres->header.size); + + const char * + str = (sizeof (*opres) < size) ? (const char *) &opres[1] : ""; + + if (GNUNET_YES == GNUNET_OP_result (h->op, GNUNET_ntohll (opres->op_id), + GNUNET_ntohll (opres->result_code) + INT64_MIN, + str, size - sizeof (*opres), (void **) &op)) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: Received result message with OP ID: %" PRIu64 "\n", + GNUNET_ntohll (opres->op_id)); + GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_code: No callback registered for OP ID %" PRIu64 ".\n", + GNUNET_ntohll (opres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +static void +handle_result_counters (void *cls, const struct CountersResult *cres) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (cres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->counters_cb) + { + op->counters_cb (op->cls, + ntohl (cres->result_code), + GNUNET_ntohll (cres->max_fragment_id), + GNUNET_ntohll (cres->max_message_id), + GNUNET_ntohll (cres->max_group_generation), + GNUNET_ntohll (cres->max_state_message_id)); + } + GNUNET_OP_remove (h->op, GNUNET_ntohll (cres->op_id)); + GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_counters: No callback registered for OP ID %" PRIu64 ".\n", + GNUNET_ntohll (cres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +static int +check_result_fragment (void *cls, const struct FragmentResult *fres) +{ + uint16_t size = ntohs (fres->header.size); + struct GNUNET_MULTICAST_MessageHeader *mmsg = + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1]; + if (sizeof (*fres) + sizeof (*mmsg) < size + && sizeof (*fres) + ntohs (mmsg->header.size) != size) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_fragment: Received message with invalid length %lu bytes.\n", + size, sizeof (*fres)); + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +static void +handle_result_fragment (void *cls, const struct FragmentResult *fres) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (fres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->fragment_cb) + op->fragment_cb (op->cls, + (struct GNUNET_MULTICAST_MessageHeader *) &fres[1], + ntohl (fres->psycstore_flags)); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (fres->op_id)); + //GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_fragment: No callback registered for OP ID %" PRIu64 ".\n", + GNUNET_ntohll (fres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +static int +check_result_state (void *cls, const struct StateResult *sres) +{ + const char *name = (const char *) &sres[1]; + uint16_t size = ntohs (sres->header.size); + uint16_t name_size = ntohs (sres->name_size); + + if (name_size <= 2 + || size - sizeof (*sres) < name_size + || '\0' != name[name_size - 1]) + { + LOG (GNUNET_ERROR_TYPE_ERROR, + "check_result_state: Received state result message with invalid name.\n"); + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +static void +handle_result_state (void *cls, const struct StateResult *sres) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + + const char *name = (const char *) &sres[1]; + uint16_t name_size = ntohs (sres->name_size); + + if (GNUNET_YES == GNUNET_OP_get (h->op, GNUNET_ntohll (sres->op_id), + NULL, NULL, (void **) &op)) + { + GNUNET_assert (NULL != op); + if (NULL != op->state_cb) + op->state_cb (op->cls, name, (char *) &sres[1] + name_size, + ntohs (sres->header.size) - sizeof (*sres) - name_size); + //GNUNET_OP_remove (h->op, GNUNET_ntohll (sres->op_id)); + //GNUNET_free (op); + } + else + { + LOG (GNUNET_ERROR_TYPE_DEBUG, + "handle_result_state: No callback registered for OP ID %" PRIu64 ".\n", + GNUNET_ntohll (sres->op_id)); + } + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; +} + + +static void +reconnect (void *cls); + + +/** + * Client disconnected from service. + * + * Reconnect after backoff period.= + */ +static void +disconnected (void *cls, enum GNUNET_MQ_Error error) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Origin client disconnected (%d), re-connecting\n", + (int) error); + if (NULL != h->mq) + { + GNUNET_MQ_destroy (h->mq); + GNUNET_OP_destroy (h->op); + h->mq = NULL; + h->op = NULL; + } + + h->reconnect_task = GNUNET_SCHEDULER_add_delayed (h->reconnect_delay, + &reconnect, h); + h->reconnect_delay = GNUNET_TIME_STD_BACKOFF (h->reconnect_delay); +} + + +static void +do_connect (struct GNUNET_PSYCSTORE_Handle *h) +{ + LOG (GNUNET_ERROR_TYPE_DEBUG, + "Connecting to PSYCstore service.\n"); + + struct GNUNET_MQ_MessageHandler handlers[] = { + GNUNET_MQ_hd_var_size (result_code, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_CODE, + struct OperationResult, + h), + GNUNET_MQ_hd_fixed_size (result_counters, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_COUNTERS, + struct CountersResult, + h), + GNUNET_MQ_hd_var_size (result_fragment, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_FRAGMENT, + struct FragmentResult, + h), + GNUNET_MQ_hd_var_size (result_state, + GNUNET_MESSAGE_TYPE_PSYCSTORE_RESULT_STATE, + struct StateResult, + h), + GNUNET_MQ_handler_end () + }; + + h->op = GNUNET_OP_create (); + GNUNET_assert (NULL == h->mq); + h->mq = GNUNET_CLIENT_connect (h->cfg, "psycstore", + handlers, disconnected, h); + GNUNET_assert (NULL != h->mq); +} + + +/** + * Try again to connect to the PSYCstore service. + * + * @param cls Handle to the PSYCstore service. + */ +static void +reconnect (void *cls) +{ + struct GNUNET_PSYCSTORE_Handle *h = cls; + + h->reconnect_task = NULL; + do_connect (cls); +} + + +/** + * Connect to the PSYCstore service. + * + * @param cfg The configuration to use + * @return Handle to use + */ +struct GNUNET_PSYCSTORE_Handle * +GNUNET_PSYCSTORE_connect (const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_PSYCSTORE_Handle *h + = GNUNET_new (struct GNUNET_PSYCSTORE_Handle); + h->cfg = cfg; + h->reconnect_delay = GNUNET_TIME_UNIT_MILLISECONDS; + do_connect (h); + return h; +} + + +/** + * Disconnect from PSYCstore service + * + * @param h Handle to destroy + */ +void +GNUNET_PSYCSTORE_disconnect (struct GNUNET_PSYCSTORE_Handle *h) +{ + GNUNET_assert (NULL != h); + if (h->reconnect_task != NULL) + { + GNUNET_SCHEDULER_cancel (h->reconnect_task); + h->reconnect_task = NULL; + } + if (NULL != h->mq) + { + // FIXME: free data structures for pending operations + GNUNET_MQ_destroy (h->mq); + h->mq = NULL; + } + GNUNET_free (h); +} + + +/** + * Message sent notification. + * + * Remove invalidated envelope pointer. + */ +static void +message_sent (void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = cls; + op->env = NULL; +} + + +/** + * Create a new operation. + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_create (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_OP_Handle *hop, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle * + op = GNUNET_malloc (sizeof (*op)); + op->h = h; + op->op_id = GNUNET_OP_add (hop, + (GNUNET_ResultCallback) result_cb, + cls, op); + return op; +} + + +/** + * Send a message associated with an operation. + * + * @param h + * PSYCstore handle. + * @param op + * Operation handle. + * @param env + * Message envelope to send. + * @param[out] op_id + * Operation ID to write in network byte order. NULL if not needed. + * + * @return Operation handle. + * + */ +static struct GNUNET_PSYCSTORE_OperationHandle * +op_send (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_PSYCSTORE_OperationHandle *op, + struct GNUNET_MQ_Envelope *env, + uint64_t *op_id) +{ + op->env = env; + if (NULL != op_id) + *op_id = GNUNET_htonll (op->op_id); + + GNUNET_MQ_notify_sent (env, message_sent, op); + GNUNET_MQ_send (h->mq, env); + return op; +} + + +/** + * Cancel a PSYCstore operation. Note that the operation MAY still + * be executed; this merely cancels the continuation; if the request + * was already transmitted, the service may still choose to complete + * the operation. + * + * @param op Operation to cancel. + * + * @return #GNUNET_YES if message was not sent yet and got discarded, + * #GNUNET_NO if it was already sent, and only the callbacks got cancelled. + */ +int +GNUNET_PSYCSTORE_operation_cancel (struct GNUNET_PSYCSTORE_OperationHandle *op) +{ + struct GNUNET_PSYCSTORE_Handle *h = op->h; + int ret = GNUNET_NO; + + if (NULL != op->env) + { + GNUNET_MQ_send_cancel (op->env); + ret = GNUNET_YES; + } + + GNUNET_OP_remove (h->op, op->op_id); + GNUNET_free (op); + + return ret; +} + + +/** + * Store join/leave events for a PSYC channel in order to be able to answer + * membership test queries later. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel where the event happened. + * @param slave_key + * Public key of joining/leaving slave. + * @param did_join + * #GNUNET_YES on join, #GNUNET_NO on part. + * @param announced_at + * ID of the message that announced the membership change. + * @param effective_since + * Message ID this membership change is in effect since. + * For joins it is <= announced_at, for parts it is always 0. + * @param group_generation + * In case of a part, the last group generation the slave has access to. + * It has relevance when a larger message have fragments with different + * group generations. + * @param result_cb + * Callback to call with the result of the storage operation. + * @param cls + * Closure for the callback. + * + * @return Operation handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_membership_store (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + int did_join, + uint64_t announced_at, + uint64_t effective_since, + uint64_t group_generation, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + GNUNET_assert (NULL != h); + GNUNET_assert (NULL != channel_key); + GNUNET_assert (NULL != slave_key); + GNUNET_assert (GNUNET_YES == did_join || GNUNET_NO == did_join); + GNUNET_assert (did_join + ? effective_since <= announced_at + : effective_since == 0); + + struct MembershipStoreRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_STORE); + req->channel_key = *channel_key; + req->slave_key = *slave_key; + req->did_join = did_join; + req->announced_at = GNUNET_htonll (announced_at); + req->effective_since = GNUNET_htonll (effective_since); + req->group_generation = GNUNET_htonll (group_generation); + + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} + + +/** + * Test if a member was admitted to the channel at the given message ID. + * + * This is useful when relaying and replaying messages to check if a particular + * slave has access to the message fragment with a given group generation. It + * is also used when handling join requests to determine whether the slave is + * currently admitted to the channel. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * Public key of slave whose membership to check. + * @param message_id + * Message ID for which to do the membership test. + * @param group_generation + * Group generation of the fragment of the message to test. + * It has relevance if the message consists of multiple fragments with + * different group generations. + * @param result_cb + * Callback to call with the test result. + * @param cls + * Closure for the callback. + * + * @return Operation handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_membership_test (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_id, + uint64_t group_generation, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct MembershipTestRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MEMBERSHIP_TEST); + req->channel_key = *channel_key; + req->slave_key = *slave_key; + req->message_id = GNUNET_htonll (message_id); + req->group_generation = GNUNET_htonll (group_generation); + + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} + + +/** + * Store a message fragment sent to a channel. + * + * @param h Handle for the PSYCstore. + * @param channel_key The channel the message belongs to. + * @param message Message to store. + * @param psycstore_flags Flags indicating whether the PSYC message contains + * state modifiers. + * @param result_cb Callback to call with the result of the operation. + * @param cls Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_store (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags psycstore_flags, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + uint16_t size = ntohs (msg->header.size); + struct FragmentStoreRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_STORE); + req->channel_key = *channel_key; + req->psycstore_flags = htonl (psycstore_flags); + GNUNET_memcpy (&req[1], msg, size); + + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} + + +/** + * Retrieve message fragments by fragment ID range. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the fragment. If not NULL, a membership test is + * performed first and the fragment is only returned if the slave has + * access to it. + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_get (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t first_fragment_id, + uint64_t last_fragment_id, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct FragmentGetRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); + req->channel_key = *channel_key; + req->first_fragment_id = GNUNET_htonll (first_fragment_id); + req->last_fragment_id = GNUNET_htonll (last_fragment_id); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + + +/** + * Retrieve latest message fragments. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the fragment. If not NULL, a membership test is + * performed first and the fragment is only returned if the slave has + * access to it. + * @param first_fragment_id + * First fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param last_fragment_id + * Last consecutive fragment ID to retrieve. + * Use 0 to get the latest message fragment. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_fragment_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t fragment_limit, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct FragmentGetRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_FRAGMENT_GET); + req->channel_key = *channel_key; + req->fragment_limit = GNUNET_ntohll (fragment_limit); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + + +/** + * Retrieve all fragments of messages in a message ID range. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message. + * If not NULL, a membership test is performed first + * and the message is only returned if the slave has access to it. + * @param first_message_id + * First message ID to retrieve. + * @param last_message_id + * Last consecutive message ID to retrieve. + * @param fragment_limit + * Maximum number of fragments to retrieve. + * @param method_prefix + * Retrieve only messages with a matching method prefix. + * @todo Implement method_prefix query. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_message_get (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t first_message_id, + uint64_t last_message_id, + uint64_t fragment_limit, + const char *method_prefix, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct MessageGetRequest *req; + if (NULL == method_prefix) + method_prefix = ""; + uint16_t method_size = strnlen (method_prefix, + GNUNET_MAX_MESSAGE_SIZE + - sizeof (*req)) + 1; + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); + req->channel_key = *channel_key; + req->first_message_id = GNUNET_htonll (first_message_id); + req->last_message_id = GNUNET_htonll (last_message_id); + req->fragment_limit = GNUNET_htonll (fragment_limit); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + GNUNET_memcpy (&req[1], method_prefix, method_size); + ((char *) &req[1])[method_size - 1] = '\0'; + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + + +/** + * Retrieve all fragments of the latest messages. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message. + * If not NULL, a membership test is performed first + * and the message is only returned if the slave has access to it. + * @param message_limit + * Maximum number of messages to retrieve. + * @param method_prefix + * Retrieve only messages with a matching method prefix. + * @todo Implement method_prefix query. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_message_get_latest (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_limit, + const char *method_prefix, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct MessageGetRequest *req; + + if (NULL == method_prefix) + method_prefix = ""; + uint16_t method_size = strnlen (method_prefix, + GNUNET_MAX_MESSAGE_SIZE + - sizeof (*req)) + 1; + GNUNET_assert ('\0' == method_prefix[method_size - 1]); + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, method_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET); + req->channel_key = *channel_key; + req->message_limit = GNUNET_ntohll (message_limit); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + GNUNET_memcpy (&req[1], method_prefix, method_size); + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + + +/** + * Retrieve a fragment of message specified by its message ID and fragment + * offset. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param slave_key + * The slave requesting the message fragment. If not NULL, a membership + * test is performed first and the message fragment is only returned + * if the slave has access to it. + * @param message_id + * Message ID to retrieve. Use 0 to get the latest message. + * @param fragment_offset + * Offset of the fragment to retrieve. + * @param fragment_cb + * Callback to call with the retrieved fragments. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_message_get_fragment (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const struct GNUNET_CRYPTO_EcdsaPublicKey *slave_key, + uint64_t message_id, + uint64_t fragment_offset, + GNUNET_PSYCSTORE_FragmentCallback fragment_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct MessageGetFragmentRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_MESSAGE_GET_FRAGMENT); + + req->channel_key = *channel_key; + req->message_id = GNUNET_htonll (message_id); + req->fragment_offset = GNUNET_htonll (fragment_offset); + if (NULL != slave_key) + { + req->slave_key = *slave_key; + req->do_membership_test = GNUNET_YES; + } + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->fragment_cb = fragment_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + + +/** + * Retrieve latest values of counters for a channel master. + * + * The current value of counters are needed when a channel master is restarted, + * so that it can continue incrementing the counters from their last value. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * Public key that identifies the channel. + * @param ccb + * Callback to call with the result. + * @param ccb_cls + * Closure for the @a ccb callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_counters_get (struct GNUNET_PSYCSTORE_Handle *h, + struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + GNUNET_PSYCSTORE_CountersCallback counters_cb, + void *cls) +{ + struct OperationRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_COUNTERS_GET); + req->channel_key = *channel_key; + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, NULL, NULL); + op->counters_cb = counters_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + + +/** + * Apply modifiers of a message to the current channel state. + * + * An error is returned if there are missing messages containing state + * operations before the current one. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param message_id + * ID of the message that contains the @a modifiers. + * @param state_delta + * Value of the _state_delta PSYC header variable of the message. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for @a result_cb. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_modify (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + uint64_t state_delta, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct StateModifyRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_MODIFY); + req->channel_key = *channel_key; + req->message_id = GNUNET_htonll (message_id); + req->state_delta = GNUNET_htonll (state_delta); + + return op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} + + +struct StateSyncClosure +{ + GNUNET_PSYCSTORE_ResultCallback result_cb; + void *cls; + uint8_t last; +}; + + +static void +state_sync_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct StateSyncClosure *ssc = cls; + if (GNUNET_OK != result || ssc->last) + ssc->result_cb (ssc->cls, result, err_msg, err_msg_size); + GNUNET_free (ssc); +} + + +/** + * Store synchronized state. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param max_state_message_id + * ID of the last stateful message before @a state_hash_message_id. + * @param state_hash_message_id + * ID of the message that contains the state_hash PSYC header variable. + * @param modifier_count + * Number of elements in the @a modifiers array. + * @param modifiers + * Full state to store. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_sync (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t max_state_message_id, + uint64_t state_hash_message_id, + size_t modifier_count, + const struct GNUNET_PSYC_Modifier *modifiers, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct GNUNET_PSYCSTORE_OperationHandle *op = NULL; + size_t i; + + for (i = 0; i < modifier_count; i++) { + struct StateSyncRequest *req; + uint16_t name_size = strlen (modifiers[i].name) + 1; + + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, + sizeof (*req) + name_size + modifiers[i].value_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); + + req->header.type = htons (GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_SYNC); + req->header.size = htons (sizeof (*req) + name_size + + modifiers[i].value_size); + req->channel_key = *channel_key; + req->max_state_message_id = GNUNET_htonll (max_state_message_id); + req->state_hash_message_id = GNUNET_htonll (state_hash_message_id); + req->name_size = htons (name_size); + req->flags + = (0 == i) + ? STATE_OP_FIRST + : (modifier_count - 1 == i) + ? STATE_OP_LAST + : 0; + + GNUNET_memcpy (&req[1], modifiers[i].name, name_size); + GNUNET_memcpy ((char *) &req[1] + name_size, modifiers[i].value, modifiers[i].value_size); + + struct StateSyncClosure *ssc = GNUNET_malloc (sizeof (*ssc)); + ssc->last = (req->flags & STATE_OP_LAST); + ssc->result_cb = result_cb; + ssc->cls = cls; + + op_send (h, op_create (h, h->op, state_sync_result, ssc), + env, &req->op_id); + } + // FIXME: only one operation is returned, + // add pointers to other operations and make all cancellable. + return op; +} + + +/** + * Reset the state of a channel. + * + * Delete all state variables stored for the given channel. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callback. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_reset (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey + *channel_key, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct OperationRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_RESET); + req->channel_key = *channel_key; + + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} + + +/** + * Update signed values of state variables in the state store. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param message_id + * Message ID that contained the state @a hash. + * @param hash + * Hash of the serialized full state. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callback. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_hash_update (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + uint64_t message_id, + const struct GNUNET_HashCode *hash, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + struct StateHashUpdateRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg (req, GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_HASH_UPDATE); + req->channel_key = *channel_key; + req->hash = *hash; + + return + op_send (h, op_create (h, h->op, result_cb, cls), + env, &req->op_id); +} + + +/** + * Retrieve the best matching state variable. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param name + * Name of variable to match, the returned variable might be less specific. + * @param state_cb + * Callback to return the matching state variable. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_get (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + size_t name_size = strlen (name) + 1; + struct OperationRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET); + req->channel_key = *channel_key; + GNUNET_memcpy (&req[1], name, name_size); + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + + +/** + * Retrieve all state variables for a channel with the given prefix. + * + * @param h + * Handle for the PSYCstore. + * @param channel_key + * The channel we are interested in. + * @param name_prefix + * Prefix of state variable names to match. + * @param state_cb + * Callback to return matching state variables. + * @param result_cb + * Callback to call with the result of the operation. + * @param cls + * Closure for the callbacks. + * + * @return Handle that can be used to cancel the operation. + */ +struct GNUNET_PSYCSTORE_OperationHandle * +GNUNET_PSYCSTORE_state_get_prefix (struct GNUNET_PSYCSTORE_Handle *h, + const struct GNUNET_CRYPTO_EddsaPublicKey *channel_key, + const char *name_prefix, + GNUNET_PSYCSTORE_StateCallback state_cb, + GNUNET_PSYCSTORE_ResultCallback result_cb, + void *cls) +{ + size_t name_size = strlen (name_prefix) + 1; + struct OperationRequest *req; + struct GNUNET_MQ_Envelope * + env = GNUNET_MQ_msg_extra (req, name_size, + GNUNET_MESSAGE_TYPE_PSYCSTORE_STATE_GET_PREFIX); + req->channel_key = *channel_key; + GNUNET_memcpy (&req[1], name_prefix, name_size); + + struct GNUNET_PSYCSTORE_OperationHandle * + op = op_create (h, h->op, result_cb, cls); + op->state_cb = state_cb; + op->cls = cls; + return op_send (h, op, env, &req->op_id); +} + +/* end of psycstore_api.c */ diff --git a/src/psycstore/test_plugin_psycstore.c b/src/psycstore/test_plugin_psycstore.c new file mode 100644 index 0000000..ff4eac8 --- /dev/null +++ b/src/psycstore/test_plugin_psycstore.c @@ -0,0 +1,532 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @author Gabor X Toth + * @author Christian Grothoff + * + * @file + * Test for the PSYCstore plugins. + */ + +#include <inttypes.h> + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_testing_lib.h" +#include "gnunet_psycstore_plugin.h" +#include "gnunet_psycstore_service.h" +#include "gnunet_multicast_service.h" + +#define DEBUG_PSYCSTORE GNUNET_EXTRA_LOGGING +#if DEBUG_PSYCSTORE +# define LOG_LEVEL "DEBUG" +#else +# define LOG_LEVEL "WARNING" +#endif + +#define C2ARG(str) str, (sizeof (str) - 1) + +#define LOG(kind,...) \ + GNUNET_log_from (kind, "test-plugin-psycstore", __VA_ARGS__) + +static int ok; + +/** + * Name of plugin under test. + */ +static const char *plugin_name; + +static struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key; +static struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key; + +static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key; +static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; + +/** + * Function called when the service shuts down. Unloads our psycstore + * plugin. + * + * @param api api to unload + */ +static void +unload_plugin (struct GNUNET_PSYCSTORE_PluginFunctions *api) +{ + char *libname; + + GNUNET_asprintf (&libname, "libgnunet_plugin_psycstore_%s", plugin_name); + GNUNET_break (NULL == GNUNET_PLUGIN_unload (libname, api)); + GNUNET_free (libname); +} + + +/** + * Load the psycstore plugin. + * + * @param cfg configuration to pass + * @return NULL on error + */ +static struct GNUNET_PSYCSTORE_PluginFunctions * +load_plugin (const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_PSYCSTORE_PluginFunctions *ret; + char *libname; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, _ ("Loading `%s' psycstore plugin\n"), + plugin_name); + GNUNET_asprintf (&libname, "libgnunet_plugin_psycstore_%s", plugin_name); + if (NULL == (ret = GNUNET_PLUGIN_load (libname, (void*) cfg))) + { + FPRINTF (stderr, "Failed to load plugin `%s'!\n", plugin_name); + return NULL; + } + GNUNET_free (libname); + return ret; +} + + +#define MAX_MSG 16 + +struct FragmentClosure +{ + uint8_t n; + uint64_t flags[MAX_MSG]; + struct GNUNET_MULTICAST_MessageHeader *msg[MAX_MSG]; +}; + +static int +fragment_cb (void *cls, struct GNUNET_MULTICAST_MessageHeader *msg2, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct FragmentClosure *fcls = cls; + struct GNUNET_MULTICAST_MessageHeader *msg1; + uint64_t flags1; + int ret; + + if (fcls->n >= MAX_MSG) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + msg1 = fcls->msg[fcls->n]; + flags1 = fcls->flags[fcls->n++]; + if (NULL == msg1) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + + if (flags1 == flags && msg1->header.size == msg2->header.size + && 0 == memcmp (msg1, msg2, ntohs (msg1->header.size))) + { + LOG (GNUNET_ERROR_TYPE_DEBUG, "Fragment %llu matches\n", + GNUNET_ntohll (msg1->fragment_id)); + ret = GNUNET_YES; + } + else + { + LOG (GNUNET_ERROR_TYPE_ERROR, "Fragment %llu differs\n", + GNUNET_ntohll (msg1->fragment_id)); + ret = GNUNET_SYSERR; + } + + GNUNET_free (msg2); + return ret; +} + + +struct StateClosure { + size_t n; + char *name[16]; + void *value[16]; + size_t value_size[16]; +}; + +static int +state_cb (void *cls, const char *name, const void *value, uint32_t value_size) +{ + struct StateClosure *scls = cls; + const void *val = scls->value[scls->n]; // FIXME: check for n out-of-bounds FIRST! + size_t val_size = scls->value_size[scls->n++]; + + /* FIXME: check name */ + + LOG (GNUNET_ERROR_TYPE_DEBUG, + " name = %s, value_size = %u\n", + name, value_size); + + return GNUNET_YES; + return value_size == val_size && 0 == memcmp (value, val, val_size) + ? GNUNET_YES + : GNUNET_SYSERR; +} + + +static void +run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +{ + struct GNUNET_PSYCSTORE_PluginFunctions *db; + + ok = 1; + db = load_plugin (cfg); + if (NULL == db) + { + FPRINTF (stderr, + "%s", + "Failed to initialize PSYCstore. " + "Database likely not setup, skipping test.\n"); + ok = 77; + return; + } + + /* Store & test membership */ + + LOG (GNUNET_ERROR_TYPE_INFO, "MEMBERSHIP\n"); + + channel_key = GNUNET_CRYPTO_eddsa_key_create (); + slave_key = GNUNET_CRYPTO_ecdsa_key_create (); + + GNUNET_CRYPTO_eddsa_key_get_public (channel_key, + &channel_pub_key); + GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key); + + LOG (GNUNET_ERROR_TYPE_INFO, "membership_store()\n"); + + GNUNET_assert (GNUNET_OK == db->membership_store (db->cls, &channel_pub_key, + &slave_pub_key, GNUNET_YES, + 4, 2, 1)); + + LOG (GNUNET_ERROR_TYPE_INFO, "membership_test()\n"); + + GNUNET_assert (GNUNET_YES == db->membership_test (db->cls, &channel_pub_key, + &slave_pub_key, 4)); + + GNUNET_assert (GNUNET_YES == db->membership_test (db->cls, &channel_pub_key, + &slave_pub_key, 2)); + + GNUNET_assert (GNUNET_NO == db->membership_test (db->cls, &channel_pub_key, + &slave_pub_key, 1)); + + /* Store & get messages */ + + LOG (GNUNET_ERROR_TYPE_INFO, "MESSAGES\n"); + + struct GNUNET_MULTICAST_MessageHeader *msg + = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key)); + GNUNET_assert (msg != NULL); + + msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key)); + + uint64_t fragment_id = INT64_MAX - 1; + msg->fragment_id = GNUNET_htonll (fragment_id); + + uint64_t message_id = INT64_MAX - 10; + msg->message_id = GNUNET_htonll (message_id); + + uint64_t group_generation = INT64_MAX - 3; + msg->group_generation = GNUNET_htonll (group_generation); + + msg->hop_counter = htonl (9); + msg->fragment_offset = GNUNET_htonll (0); + msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT); + + GNUNET_memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key)); + + msg->purpose.size = htonl (ntohs (msg->header.size) + - sizeof (msg->header) + - sizeof (msg->hop_counter) + - sizeof (msg->signature)); + msg->purpose.purpose = htonl (234); + GNUNET_assert (GNUNET_OK == + GNUNET_CRYPTO_eddsa_sign (channel_key, &msg->purpose, &msg->signature)); + + LOG (GNUNET_ERROR_TYPE_INFO, "fragment_store()\n"); + + struct FragmentClosure fcls = { 0 }; + fcls.n = 0; + fcls.msg[0] = msg; + fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE; + + GNUNET_assert ( + GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg, + fcls.flags[0])); + + LOG (GNUNET_ERROR_TYPE_INFO, "fragment_get(%" PRIu64 ")\n", fragment_id); + + uint64_t ret_frags = 0; + GNUNET_assert ( + GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, + fragment_id, fragment_id, + &ret_frags, fragment_cb, &fcls)); + GNUNET_assert (fcls.n == 1); + + LOG (GNUNET_ERROR_TYPE_INFO, "message_get_fragment()\n"); + + fcls.n = 0; + GNUNET_assert ( + GNUNET_OK == db->message_get_fragment (db->cls, &channel_pub_key, + GNUNET_ntohll (msg->message_id), + GNUNET_ntohll (msg->fragment_offset), + fragment_cb, &fcls)); + GNUNET_assert (fcls.n == 1); + + LOG (GNUNET_ERROR_TYPE_INFO, "message_add_flags()\n"); + GNUNET_assert ( + GNUNET_OK == db->message_add_flags (db->cls, &channel_pub_key, + GNUNET_ntohll (msg->message_id), + GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED)); + LOG (GNUNET_ERROR_TYPE_INFO, "fragment_get(%" PRIu64 ")\n", fragment_id); + + fcls.n = 0; + fcls.flags[0] |= GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED; + + GNUNET_assert ( + GNUNET_OK == db->fragment_get (db->cls, &channel_pub_key, + fragment_id, fragment_id, + &ret_frags, fragment_cb, &fcls)); + + GNUNET_assert (fcls.n == 1); + + LOG (GNUNET_ERROR_TYPE_INFO, "fragment_store()\n"); + + struct GNUNET_MULTICAST_MessageHeader *msg1 + = GNUNET_malloc (sizeof (*msg1) + sizeof (channel_pub_key)); + + GNUNET_memcpy (msg1, msg, sizeof (*msg1) + sizeof (channel_pub_key)); + + msg1->fragment_id = GNUNET_htonll (INT64_MAX); + msg1->fragment_offset = GNUNET_htonll (32768); + + fcls.n = 0; + fcls.msg[1] = msg1; + fcls.flags[1] = GNUNET_PSYCSTORE_MESSAGE_STATE_HASH; + + GNUNET_assert (GNUNET_OK == db->fragment_store (db->cls, &channel_pub_key, msg1, + fcls.flags[1])); + + LOG (GNUNET_ERROR_TYPE_INFO, "message_get()\n"); + + GNUNET_assert ( + GNUNET_OK == db->message_get (db->cls, &channel_pub_key, + message_id, message_id, 0, + &ret_frags, fragment_cb, &fcls)); + GNUNET_assert (fcls.n == 2 && ret_frags == 2); + + /* Message counters */ + + LOG (GNUNET_ERROR_TYPE_INFO, "counters_message_get()\n"); + + fragment_id = 0; + message_id = 0; + group_generation = 0; + GNUNET_assert ( + GNUNET_OK == db->counters_message_get (db->cls, &channel_pub_key, + &fragment_id, &message_id, + &group_generation) + && fragment_id == GNUNET_ntohll (msg1->fragment_id) + && message_id == GNUNET_ntohll (msg1->message_id) + && group_generation == GNUNET_ntohll (msg1->group_generation)); + + /* Modify state */ + + LOG (GNUNET_ERROR_TYPE_INFO, "STATE\n"); + + LOG (GNUNET_ERROR_TYPE_INFO, "state_modify_*()\n"); + + message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 1; + GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key, + message_id, 0)); + + GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key, + GNUNET_PSYC_OP_ASSIGN, + "_foo", + C2ARG("one two three"))); + + GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key, + GNUNET_PSYC_OP_ASSIGN, + "_foo_bar", slave_key, + sizeof (*slave_key))); + + GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key, + message_id)); + + LOG (GNUNET_ERROR_TYPE_INFO, "state_get()\n"); + + struct StateClosure scls = { 0 }; + scls.n = 0; + scls.value[0] = "one two three"; + scls.value_size[0] = strlen ("one two three"); + + GNUNET_assert (GNUNET_OK == db->state_get (db->cls, &channel_pub_key, "_foo", + state_cb, &scls)); + GNUNET_assert (scls.n == 1); + + LOG (GNUNET_ERROR_TYPE_INFO, "state_get_prefix()\n"); + + scls.n = 0; + scls.value[1] = slave_key; + scls.value_size[1] = sizeof (*slave_key); + + GNUNET_assert (GNUNET_OK == db->state_get_prefix (db->cls, &channel_pub_key, + "_foo", state_cb, &scls)); + GNUNET_assert (scls.n == 2); + + LOG (GNUNET_ERROR_TYPE_INFO, "state_get_signed()\n"); + + scls.n = 0; + GNUNET_assert (GNUNET_NO == db->state_get_signed (db->cls, &channel_pub_key, + state_cb, &scls)); + GNUNET_assert (scls.n == 0); + + LOG (GNUNET_ERROR_TYPE_INFO, "state_update_signed()\n"); + + GNUNET_assert (GNUNET_OK == db->state_update_signed (db->cls, + &channel_pub_key)); + + LOG (GNUNET_ERROR_TYPE_INFO, "state_get_signed()\n"); + + scls.n = 0; + GNUNET_assert (GNUNET_YES == db->state_get_signed (db->cls, &channel_pub_key, + state_cb, &scls)); + GNUNET_assert (scls.n == 2); + + /* State counters */ + + LOG (GNUNET_ERROR_TYPE_INFO, "counters_state_get()\n"); + + uint64_t max_state_msg_id = 0; + GNUNET_assert (GNUNET_OK == db->counters_state_get (db->cls, &channel_pub_key, + &max_state_msg_id) + && max_state_msg_id == message_id); + + /* State sync */ + + LOG (GNUNET_ERROR_TYPE_INFO, "state_sync_*()\n"); + + scls.n = 0; + scls.value[0] = channel_key; + scls.value_size[0] = sizeof (*channel_key); + scls.value[1] = "three two one"; + scls.value_size[1] = strlen ("three two one"); + + GNUNET_assert (GNUNET_OK == db->state_sync_begin (db->cls, &channel_pub_key)); + + GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key, + "_sync_bar", scls.value[0], + scls.value_size[0])); + + GNUNET_assert (GNUNET_OK == db->state_sync_assign (db->cls, &channel_pub_key, + "_sync_foo", scls.value[1], + scls.value_size[1])); + + GNUNET_assert (GNUNET_OK == db->state_sync_end (db->cls, &channel_pub_key, + max_state_msg_id, + INT64_MAX - 5)); + + GNUNET_assert (GNUNET_NO == db->state_get_prefix (db->cls, &channel_pub_key, + "_foo", state_cb, &scls)); + GNUNET_assert (scls.n == 0); + + GNUNET_assert (GNUNET_OK == db->state_get_prefix (db->cls, &channel_pub_key, + "_sync", state_cb, &scls)); + GNUNET_assert (scls.n == 2); + + scls.n = 0; + GNUNET_assert (GNUNET_OK == db->state_get_signed (db->cls, &channel_pub_key, + state_cb, &scls)); + GNUNET_assert (scls.n == 2); + + /* Modify state after sync */ + + LOG (GNUNET_ERROR_TYPE_INFO, "state_modify_*()\n"); + + message_id = GNUNET_ntohll (fcls.msg[0]->message_id) + 6; + GNUNET_assert (GNUNET_OK == db->state_modify_begin (db->cls, &channel_pub_key, + message_id, + message_id - max_state_msg_id)); + + GNUNET_assert (GNUNET_OK == db->state_modify_op (db->cls, &channel_pub_key, + GNUNET_PSYC_OP_ASSIGN, + "_sync_foo", + C2ARG("five six seven"))); + + GNUNET_assert (GNUNET_OK == db->state_modify_end (db->cls, &channel_pub_key, + message_id)); + + /* Reset state */ + + LOG (GNUNET_ERROR_TYPE_INFO, "state_reset()\n"); + + scls.n = 0; + GNUNET_assert (GNUNET_OK == db->state_reset (db->cls, &channel_pub_key)); + GNUNET_assert (scls.n == 0); + + ok = 0; + + if (NULL != channel_key) + { + GNUNET_free (channel_key); + channel_key = NULL; + } + if (NULL != slave_key) + { + GNUNET_free (slave_key); + slave_key = NULL; + } + + unload_plugin (db); +} + + +int +main (int argc, char *argv[]) +{ + char cfg_name[128]; + char *const xargv[] = { + "test-plugin-psycstore", + "-c", cfg_name, + "-L", LOG_LEVEL, + NULL + }; + struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_OPTION_END + }; + GNUNET_DISK_directory_remove ("/tmp/gnunet-test-plugin-psycstore-sqlite"); + GNUNET_log_setup ("test-plugin-psycstore", LOG_LEVEL, NULL); + plugin_name = GNUNET_TESTING_get_testname_from_underscore (argv[0]); + GNUNET_snprintf (cfg_name, sizeof (cfg_name), "test_plugin_psycstore_%s.conf", + plugin_name); + GNUNET_PROGRAM_run ((sizeof (xargv) / sizeof (char *)) - 1, xargv, + "test-plugin-psycstore", "nohelp", options, &run, NULL); + + if ( (0 != ok) && + (77 != ok) ) + FPRINTF (stderr, "Missed some testcases: %d\n", ok); + +#if ! DEBUG_PSYCSTORE + GNUNET_DISK_directory_remove ("/tmp/gnunet-test-plugin-psycstore-sqlite"); +#endif + + return ok; +} + +/* end of test_plugin_psycstore.c */ diff --git a/src/psycstore/test_plugin_psycstore_mysql.conf b/src/psycstore/test_plugin_psycstore_mysql.conf new file mode 100644 index 0000000..e15b3fd --- /dev/null +++ b/src/psycstore/test_plugin_psycstore_mysql.conf @@ -0,0 +1,7 @@ +[psycstore-mysql] +DATABASE = test +# CONFIG = ~/.my.cnf +# USER = gnunet +# PASSWORD = +# HOST = localhost +# PORT = 3306 diff --git a/src/psycstore/test_plugin_psycstore_postgres.conf b/src/psycstore/test_plugin_psycstore_postgres.conf new file mode 100644 index 0000000..4b870dd --- /dev/null +++ b/src/psycstore/test_plugin_psycstore_postgres.conf @@ -0,0 +1,2 @@ +[psycstore-postgres] +CONFIG = connect_timeout=10; dbname=template1 diff --git a/src/psycstore/test_plugin_psycstore_sqlite.conf b/src/psycstore/test_plugin_psycstore_sqlite.conf new file mode 100644 index 0000000..498b1d0 --- /dev/null +++ b/src/psycstore/test_plugin_psycstore_sqlite.conf @@ -0,0 +1,2 @@ +[psycstore-sqlite] +FILENAME = $GNUNET_TMP/gnunet-test-plugin-psycstore-sqlite/sqlite.db diff --git a/src/psycstore/test_psycstore.c b/src/psycstore/test_psycstore.c new file mode 100644 index 0000000..ca50904 --- /dev/null +++ b/src/psycstore/test_psycstore.c @@ -0,0 +1,586 @@ +/* + * This file is part of GNUnet + * Copyright (C) 2013 GNUnet e.V. + * + * GNUnet is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published + * by the Free Software Foundation, either version 3 of the License, + * or (at your option) any later version. + * + * GNUnet is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + + SPDX-License-Identifier: AGPL3.0-or-later + */ + +/** + * @file psycstore/test_psycstore.c + * @brief Test for the PSYCstore service. + * @author Gabor X Toth + * @author Christian Grothoff + */ + +#include <inttypes.h> + +#include "platform.h" +#include "gnunet_util_lib.h" +#include "gnunet_common.h" +#include "gnunet_testing_lib.h" +#include "gnunet_psycstore_service.h" + +#define TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, 30) + + +/** + * Return value from 'main'. + */ +static int res; + +/** + * Handle to PSYCstore service. + */ +static struct GNUNET_PSYCSTORE_Handle *h; + +/** + * Handle to PSYCstore operation. + */ +static struct GNUNET_PSYCSTORE_OperationHandle *op; + +/** + * Handle for task for timeout termination. + */ +static struct GNUNET_SCHEDULER_Task *end_badly_task; + +static struct GNUNET_CRYPTO_EddsaPrivateKey *channel_key; +static struct GNUNET_CRYPTO_EcdsaPrivateKey *slave_key; + +static struct GNUNET_CRYPTO_EddsaPublicKey channel_pub_key; +static struct GNUNET_CRYPTO_EcdsaPublicKey slave_pub_key; + +static struct FragmentClosure +{ + uint8_t n; + uint8_t n_expected; + uint64_t flags[16]; + struct GNUNET_MULTICAST_MessageHeader *msg[16]; +} fcls; + +struct StateClosure { + size_t n; + char *name[16]; + void *value[16]; + size_t value_size[16]; +} scls; + +static struct GNUNET_PSYC_Modifier modifiers[16]; + +/** + * Clean up all resources used. + */ +static void +cleanup () +{ + if (NULL != op) + { + GNUNET_PSYCSTORE_operation_cancel (op); + op = NULL; + } + if (NULL != h) + { + GNUNET_PSYCSTORE_disconnect (h); + h = NULL; + } + if (NULL != channel_key) + { + GNUNET_free (channel_key); + channel_key = NULL; + } + if (NULL != slave_key) + { + GNUNET_free (slave_key); + slave_key = NULL; + } + GNUNET_SCHEDULER_shutdown (); +} + + +/** + * Terminate the testcase (failure). + * + * @param cls NULL + */ +static void +end_badly (void *cls) +{ + res = 1; + cleanup (); +} + + +/** + * Terminate the testcase (success). + * + * @param cls NULL + */ +static void +end_normally (void *cls) +{ + res = 0; + cleanup (); +} + + +/** + * Finish the testcase (successfully). + */ +static void +end () +{ + if (NULL != end_badly_task) + { + GNUNET_SCHEDULER_cancel (end_badly_task); + end_badly_task = NULL; + } + GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_MILLISECONDS, + &end_normally, NULL); +} + + +static void +state_reset_result (void *cls, + int64_t result, + const char *err_msg, + uint16_t err_msg_size) +{ + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "state_reset_result:\t%d\n", + (int) result); + GNUNET_assert (GNUNET_OK == result); + + op = GNUNET_PSYCSTORE_state_reset (h, &channel_pub_key, + &state_reset_result, cls); + GNUNET_PSYCSTORE_operation_cancel (op); + op = NULL; + end (); +} + + +static int +state_result (void *cls, + const char *name, + const void *value, + uint32_t value_size) +{ + struct StateClosure *scls = cls; + const char *nam = scls->name[scls->n]; + const void *val = scls->value[scls->n]; + size_t val_size = scls->value_size[scls->n++]; + + if (value_size == val_size + && 0 == memcmp (value, val, val_size) + && 0 == strcmp (name, nam)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + " variable %s matches\n", + name); + return GNUNET_YES; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " variable %s differs\nReceived: %.*s\nExpected: %.*s\n", + name, (int) value_size, (char*) value, (int) val_size, (char*) val); + GNUNET_assert (0); + return GNUNET_SYSERR; + } +} + + +static void +state_get_prefix_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct StateClosure *scls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_prefix_result:\t%ld\n", (long int) result); + GNUNET_assert (GNUNET_OK == result && 2 == scls->n); + + op = GNUNET_PSYCSTORE_state_reset (h, &channel_pub_key, + &state_reset_result, cls); +} + + +static void +state_get_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_get_result:\t%ld\n", (long int) result); + GNUNET_assert (GNUNET_OK == result); + + scls.n = 0; + + scls.name[0] = "_sync_bar"; + scls.value[0] = "ten eleven twelve"; + scls.value_size[0] = sizeof ("ten eleven twelve") - 1; + + scls.name[1] = "_sync_foo"; + scls.value[1] = "three two one"; + scls.value_size[1] = sizeof ("three two one") - 1; + + op = GNUNET_PSYCSTORE_state_get_prefix (h, &channel_pub_key, "_sync", + &state_result, + &state_get_prefix_result, &scls); +} + + +static void +counters_result (void *cls, int status, uint64_t max_fragment_id, + uint64_t max_message_id, uint64_t max_group_generation, + uint64_t max_state_message_id) +{ + struct FragmentClosure *fcls = cls; + int result = 0; + op = NULL; + + if (GNUNET_OK == status + && max_fragment_id == GNUNET_ntohll (fcls->msg[2]->fragment_id) + && max_message_id == GNUNET_ntohll (fcls->msg[2]->message_id) + && max_group_generation == GNUNET_ntohll (fcls->msg[2]->group_generation) + && max_state_message_id == GNUNET_ntohll (fcls->msg[0]->message_id)) + result = 1; + + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "counters_get:\t%d\n", result); + GNUNET_assert (result == 1); + + scls.n = 0; + scls.name[0] = "_sync_bar"; + scls.value[0] = "ten eleven twelve"; + scls.value_size[0] = sizeof ("ten eleven twelve") - 1; + + op = GNUNET_PSYCSTORE_state_get (h, &channel_pub_key, "_sync_bar_x_yy_zzz", + &state_result, &state_get_result, &scls); +} + + +static void +state_modify_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_modify_result:\t%ld\n", (long int) result); + GNUNET_assert (GNUNET_OK == result); + + op = GNUNET_PSYCSTORE_counters_get (h, &channel_pub_key, + &counters_result, cls); +} + + +static void +state_sync_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "state_sync_result:\t%ld\n", (long int) result); + GNUNET_assert (GNUNET_OK == result); + + op = GNUNET_PSYCSTORE_state_modify (h, &channel_pub_key, + GNUNET_ntohll (fcls->msg[0]->message_id), + 0, state_modify_result, fcls); +} + + +static int +fragment_result (void *cls, + struct GNUNET_MULTICAST_MessageHeader *msg, + enum GNUNET_PSYCSTORE_MessageFlags flags) +{ + struct FragmentClosure *fcls = cls; + GNUNET_assert (fcls->n < fcls->n_expected); + struct GNUNET_MULTICAST_MessageHeader *msg0 = fcls->msg[fcls->n]; + uint64_t flags0 = fcls->flags[fcls->n++]; + + if (flags == flags0 && msg->header.size == msg0->header.size + && 0 == memcmp (msg, msg0, ntohs (msg->header.size))) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, " fragment %" PRIu64 " matches\n", + GNUNET_ntohll (msg->fragment_id)); + return GNUNET_YES; + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + " fragment differs: expected %" PRIu64 ", got %" PRIu64 "\n", + GNUNET_ntohll (msg0->fragment_id), + GNUNET_ntohll (msg->fragment_id)); + GNUNET_assert (0); + return GNUNET_SYSERR; + } +} + + +static void +message_get_latest_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_latest:\t%ld\n", (long int) result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); + + modifiers[0] = (struct GNUNET_PSYC_Modifier) { + .oper = '=', + .name = "_sync_foo", + .value = "three two one", + .value_size = sizeof ("three two one") - 1 + }; + modifiers[1] = (struct GNUNET_PSYC_Modifier) { + .oper = '=', + .name = "_sync_bar", + .value = "ten eleven twelve", + .value_size = sizeof ("ten eleven twelve") - 1 + }; + + op = GNUNET_PSYCSTORE_state_sync (h, &channel_pub_key, + GNUNET_ntohll (fcls->msg[0]->message_id) + 1, + GNUNET_ntohll (fcls->msg[0]->message_id) + 2, + 2, modifiers, state_sync_result, fcls); +} + + +static void +message_get_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get:\t%ld\n", (long int) result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); + + fcls->n = 0; + fcls->n_expected = 3; + op = GNUNET_PSYCSTORE_message_get_latest (h, &channel_pub_key, &slave_pub_key, + 1, "", &fragment_result, + &message_get_latest_result, fcls); +} + + +static void +message_get_fragment_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "message_get_fragment:\t%ld\n", (long int) result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); + + fcls->n = 0; + fcls->n_expected = 3; + uint64_t message_id = GNUNET_ntohll (fcls->msg[0]->message_id); + op = GNUNET_PSYCSTORE_message_get (h, &channel_pub_key, &slave_pub_key, + message_id, message_id, 0, "", + &fragment_result, + &message_get_result, fcls); +} + + +static void +fragment_get_latest_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_get_latest:\t%ld\n", (long int) result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); + + fcls->n = 1; + fcls->n_expected = 2; + op = GNUNET_PSYCSTORE_message_get_fragment (h, &channel_pub_key, &slave_pub_key, + GNUNET_ntohll (fcls->msg[1]->message_id), + GNUNET_ntohll (fcls->msg[1]->fragment_offset), + &fragment_result, + &message_get_fragment_result, fcls); +} + + +static void +fragment_get_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + struct FragmentClosure *fcls = cls; + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "fragment_get:\t%d\n", + (int) result); + GNUNET_assert (0 < result && fcls->n == fcls->n_expected); + + fcls->n = 0; + fcls->n_expected = 3; + op = GNUNET_PSYCSTORE_fragment_get_latest (h, &channel_pub_key, + &slave_pub_key, fcls->n_expected, + &fragment_result, + &fragment_get_latest_result, fcls); +} + + +static void +fragment_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "fragment_store:\t%ld\n", (long int) result); + GNUNET_assert (GNUNET_OK == result); + + if ((intptr_t) cls == GNUNET_YES) + { /* last fragment */ + fcls.n = 0; + fcls.n_expected = 1; + uint64_t fragment_id = GNUNET_ntohll (fcls.msg[0]->fragment_id); + op = GNUNET_PSYCSTORE_fragment_get (h, &channel_pub_key, &slave_pub_key, + fragment_id, fragment_id, + &fragment_result, + &fragment_get_result, &fcls); + } +} + + +static void +fragment_store () +{ + struct GNUNET_MULTICAST_MessageHeader *msg; + fcls.flags[0] = GNUNET_PSYCSTORE_MESSAGE_STATE; + fcls.msg[0] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key)); + GNUNET_assert (msg != NULL); + + msg->header.type = htons (GNUNET_MESSAGE_TYPE_MULTICAST_MESSAGE); + msg->header.size = htons (sizeof (*msg) + sizeof (channel_pub_key)); + + msg->hop_counter = htonl (9); + msg->fragment_id = GNUNET_htonll (INT64_MAX - 8); + msg->fragment_offset = GNUNET_htonll (0); + msg->message_id = GNUNET_htonll (INT64_MAX - 10); + msg->group_generation = GNUNET_htonll (INT64_MAX - 3); + msg->flags = htonl (GNUNET_MULTICAST_MESSAGE_LAST_FRAGMENT); + + GNUNET_memcpy (&msg[1], &channel_pub_key, sizeof (channel_pub_key)); + + msg->purpose.size = htonl (ntohs (msg->header.size) + - sizeof (msg->header) + - sizeof (msg->hop_counter) + - sizeof (msg->signature)); + msg->purpose.purpose = htonl (234); + GNUNET_assert (GNUNET_OK == GNUNET_CRYPTO_eddsa_sign (channel_key, &msg->purpose, + &msg->signature)); + + op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg, fcls.flags[0], + &fragment_store_result, GNUNET_NO); + + fcls.flags[1] = GNUNET_PSYCSTORE_MESSAGE_STATE_APPLIED; + fcls.msg[1] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key)); + GNUNET_memcpy (msg, fcls.msg[0], sizeof (*msg) + sizeof (channel_pub_key)); + msg->fragment_id = GNUNET_htonll (INT64_MAX - 4); + msg->fragment_offset = GNUNET_htonll (1024); + + op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg, fcls.flags[1], + &fragment_store_result, GNUNET_NO); + + fcls.flags[2] = GNUNET_PSYCSTORE_MESSAGE_STATE_HASH; + fcls.msg[2] = msg = GNUNET_malloc (sizeof (*msg) + sizeof (channel_pub_key)); + GNUNET_memcpy (msg, fcls.msg[1], sizeof (*msg) + sizeof (channel_pub_key)); + msg->fragment_id = GNUNET_htonll (INT64_MAX); + msg->fragment_offset = GNUNET_htonll (16384); + + op = GNUNET_PSYCSTORE_fragment_store (h, &channel_pub_key, msg, fcls.flags[2], + &fragment_store_result, (void *) GNUNET_YES); +} + + +static void +membership_test_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_test:\t%ld\n", (long int) result); + GNUNET_assert (GNUNET_OK == result); + + fragment_store (); +} + + +static void +membership_store_result (void *cls, int64_t result, + const char *err_msg, uint16_t err_msg_size) +{ + op = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "membership_store:\t%ld\n", (long int) result); + GNUNET_assert (GNUNET_OK == result); + + op = GNUNET_PSYCSTORE_membership_test (h, &channel_pub_key, &slave_pub_key, + INT64_MAX - 10, 2, + &membership_test_result, NULL); +} + + +/** + * Main function of the test, run from scheduler. + * + * @param cls NULL + * @param cfg configuration we use (also to connect to PSYCstore service) + * @param peer handle to access more of the peer (not used) + */ +static void +#if DEBUG_TEST_PSYCSTORE +run (void *cls, char *const *args, const char *cfgfile, + const struct GNUNET_CONFIGURATION_Handle *cfg) +#else +run (void *cls, + const struct GNUNET_CONFIGURATION_Handle *cfg, + struct GNUNET_TESTING_Peer *peer) +#endif +{ + end_badly_task = GNUNET_SCHEDULER_add_delayed (TIMEOUT, &end_badly, NULL); + + h = GNUNET_PSYCSTORE_connect (cfg); + GNUNET_assert (NULL != h); + + channel_key = GNUNET_CRYPTO_eddsa_key_create (); + slave_key = GNUNET_CRYPTO_ecdsa_key_create (); + + GNUNET_CRYPTO_eddsa_key_get_public (channel_key, &channel_pub_key); + GNUNET_CRYPTO_ecdsa_key_get_public (slave_key, &slave_pub_key); + + op = GNUNET_PSYCSTORE_membership_store (h, &channel_pub_key, &slave_pub_key, + GNUNET_YES, INT64_MAX - 5, + INT64_MAX - 10, 2, + &membership_store_result, NULL); +} + + +int +main (int argc, char *argv[]) +{ + res = 1; +#if DEBUG_TEST_PSYCSTORE + const struct GNUNET_GETOPT_CommandLineOption opts[] = { + GNUNET_GETOPT_OPTION_END + }; + if (GNUNET_OK != GNUNET_PROGRAM_run (argc, argv, "test-psycstore", + "test-psycstore [options]", + opts, &run, NULL)) + return 1; +#else + if (0 != GNUNET_TESTING_service_run ("test-psycstore", "psycstore", + "test_psycstore.conf", &run, NULL)) + return 1; +#endif + return res; +} + +/* end of test_psycstore.c */ diff --git a/src/psycstore/test_psycstore.conf b/src/psycstore/test_psycstore.conf new file mode 100644 index 0000000..fa7c2d0 --- /dev/null +++ b/src/psycstore/test_psycstore.conf @@ -0,0 +1,8 @@ +[PATHS] +GNUNET_TEST_HOME = $GNUNET_TMP/test-gnunet-psycstore/ + +[psycstore] +DATABASE = sqlite + +[psycstore-sqlite] +FILENAME = $GNUNET_TEST_HOME/psycstore/sqlite.db |