/* This file is part of GNUnet. Copyright (C) 2014, 2015, 2016 GNUnet e.V. GNUnet is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. GNUnet is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . SPDX-License-Identifier: AGPL3.0-or-later */ /** * @file peerstore/gnunet-service-peerstore.c * @brief peerstore service implementation * @author Omar Tarabai */ #include "platform.h" #include "gnunet_util_lib.h" #include "peerstore.h" #include "gnunet_peerstore_plugin.h" #include "peerstore_common.h" /** * Interval for expired records cleanup (in seconds) */ #define EXPIRED_RECORDS_CLEANUP_INTERVAL 300 /* 5mins */ /** * Our configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; /** * Database plugin library name */ static char *db_lib_name; /** * Database handle */ static struct GNUNET_PEERSTORE_PluginFunctions *db; /** * Hashmap with all watch requests */ static struct GNUNET_CONTAINER_MultiHashMap *watchers; /** * Task run to clean up expired records. */ static struct GNUNET_SCHEDULER_Task *expire_task; /** * Are we in the process of shutting down the service? #GNUNET_YES / #GNUNET_NO */ static int in_shutdown; /** * Number of connected clients. */ static unsigned int num_clients; /** * Perform the actual shutdown operations */ static void do_shutdown () { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Shutting down peerstore, bye.\n"); if (NULL != db_lib_name) { GNUNET_break (NULL == GNUNET_PLUGIN_unload (db_lib_name, db)); GNUNET_free (db_lib_name); db_lib_name = NULL; } if (NULL != watchers) { GNUNET_CONTAINER_multihashmap_destroy (watchers); watchers = NULL; } if (NULL != expire_task) { GNUNET_SCHEDULER_cancel (expire_task); expire_task = NULL; } GNUNET_SCHEDULER_shutdown (); } /** * Task run during shutdown. * * @param cls unused */ static void shutdown_task (void *cls) { GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Priming PEERSTORE for shutdown.\n"); in_shutdown = GNUNET_YES; if (0 == num_clients) /* Only when no connected clients. */ do_shutdown (); } /* Forward declaration */ static void expire_records_continuation (void *cls, int success); /** * Deletes any expired records from storage */ static void cleanup_expired_records (void *cls) { int ret; expire_task = NULL; GNUNET_assert (NULL != db); ret = db->expire_records (db->cls, GNUNET_TIME_absolute_get (), &expire_records_continuation, NULL); if (GNUNET_OK != ret) { GNUNET_assert (NULL == expire_task); expire_task = GNUNET_SCHEDULER_add_delayed ( GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL), &cleanup_expired_records, NULL); } } /** * Continuation to expire_records called by the peerstore plugin * * @param cls unused * @param success count of records deleted or #GNUNET_SYSERR */ static void expire_records_continuation (void *cls, int success) { if (success > 0) GNUNET_log (GNUNET_ERROR_TYPE_INFO, "%d records expired.\n", success); GNUNET_assert (NULL == expire_task); expire_task = GNUNET_SCHEDULER_add_delayed ( GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_SECONDS, EXPIRED_RECORDS_CLEANUP_INTERVAL), &cleanup_expired_records, NULL); } /** * A client disconnected. Remove all of its data structure entries. * * @param cls closure, NULL * @param client identification of the client * @param mq the message queue * @return */ static void * client_connect_cb (void *cls, struct GNUNET_SERVICE_Client *client, struct GNUNET_MQ_Handle *mq) { num_clients++; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "A client connected (now %u)\n", num_clients); return client; } /** * Search for a disconnected client and remove it * * @param cls closuer, a `struct GNUNET_SERVICE_Client` * @param key hash of record key * @param value the watcher client, a `struct GNUNET_SERVICE_Client *` * @return #GNUNET_OK to continue iterating */ static int client_disconnect_it (void *cls, const struct GNUNET_HashCode *key, void *value) { if (value == cls) { GNUNET_assert (GNUNET_YES == GNUNET_CONTAINER_multihashmap_remove (watchers, key, value)); num_clients++; /* Watchers do not count */ } return GNUNET_OK; } /** * A client disconnected. Remove all of its data structure entries. * * @param cls closure, NULL * @param client identification of the client */ static void client_disconnect_cb (void *cls, struct GNUNET_SERVICE_Client *client, void *app_cls) { num_clients--; if (NULL != watchers) GNUNET_CONTAINER_multihashmap_iterate (watchers, &client_disconnect_it, client); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "A client disconnected (%u remaining).\n", num_clients); if ((0 == num_clients) && in_shutdown) do_shutdown (); } /** * Function called by for each matching record. * * @param cls closure * @param record peerstore record found * @param emsg error message or NULL if no errors * @return #GNUNET_YES to continue iteration */ static void record_iterator (void *cls, const struct GNUNET_PEERSTORE_Record *record, const char *emsg) { struct GNUNET_PEERSTORE_Record *cls_record = cls; struct GNUNET_MQ_Envelope *env; if (NULL == record) { /* No more records */ struct GNUNET_MessageHeader *endmsg; env = GNUNET_MQ_msg (endmsg, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_END); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env); if (NULL == emsg) { GNUNET_SERVICE_client_continue (cls_record->client); } else { GNUNET_break (0); GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to iterate: %s\n", emsg); GNUNET_SERVICE_client_drop (cls_record->client); } PEERSTORE_destroy_record (cls_record); return; } env = PEERSTORE_create_record_mq_envelope ( record->sub_system, &record->peer, record->key, record->value, record->value_size, record->expiry, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE_RECORD); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (cls_record->client), env); } /** * Iterator over all watcher clients * to notify them of a new record * * @param cls closure, a `struct GNUNET_PEERSTORE_Record *` * @param key hash of record key * @param value the watcher client, a `struct GNUNET_SERVICE_Client *` * @return #GNUNET_YES to continue iterating */ static int watch_notifier_it (void *cls, const struct GNUNET_HashCode *key, void *value) { struct GNUNET_PEERSTORE_Record *record = cls; struct GNUNET_SERVICE_Client *client = value; struct GNUNET_MQ_Envelope *env; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found a watcher to update.\n"); env = PEERSTORE_create_record_mq_envelope ( record->sub_system, &record->peer, record->key, record->value, record->value_size, record->expiry, 0, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_RECORD); GNUNET_MQ_send (GNUNET_SERVICE_client_get_mq (client), env); return GNUNET_YES; } /** * Given a new record, notifies watchers * * @param record changed record to update watchers with */ static void watch_notifier (struct GNUNET_PEERSTORE_Record *record) { struct GNUNET_HashCode keyhash; PEERSTORE_hash_key (record->sub_system, &record->peer, record->key, &keyhash); GNUNET_CONTAINER_multihashmap_get_multiple (watchers, &keyhash, &watch_notifier_it, record); } /** * Handle a watch cancel request from client * * @param cls identification of the client * @param hm the actual message */ static void handle_watch_cancel (void *cls, const struct StoreKeyHashMessage *hm) { struct GNUNET_SERVICE_Client *client = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch cancel request.\n"); if (GNUNET_OK != GNUNET_CONTAINER_multihashmap_remove (watchers, &hm->keyhash, client)) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client); return; } num_clients++; GNUNET_SERVICE_client_continue (client); } /** * Handle a watch request from client * * @param cls identification of the client * @param hm the actual message */ static void handle_watch (void *cls, const struct StoreKeyHashMessage *hm) { struct GNUNET_SERVICE_Client *client = cls; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received a watch request.\n"); num_clients--; /* do not count watchers */ GNUNET_SERVICE_client_mark_monitor (client); GNUNET_CONTAINER_multihashmap_put (watchers, &hm->keyhash, client, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE); GNUNET_SERVICE_client_continue (client); } /** * Check an iterate request from client * * @param cls client identification of the client * @param srm the actual message * @return #GNUNET_OK if @a srm is well-formed */ static int check_iterate (void *cls, const struct StoreRecordMessage *srm) { struct GNUNET_PEERSTORE_Record *record; record = PEERSTORE_parse_record_message (srm); if (NULL == record) { GNUNET_break (0); return GNUNET_SYSERR; } if (NULL == record->sub_system) { GNUNET_break (0); PEERSTORE_destroy_record (record); return GNUNET_SYSERR; } PEERSTORE_destroy_record (record); return GNUNET_OK; } /** * Handle an iterate request from client * * @param cls identification of the client * @param srm the actual message */ static void handle_iterate (void *cls, const struct StoreRecordMessage *srm) { struct GNUNET_SERVICE_Client *client = cls; struct GNUNET_PEERSTORE_Record *record; record = PEERSTORE_parse_record_message (srm); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Iterate request: ss `%s', peer `%s', key `%s'\n", record->sub_system, GNUNET_i2s (&record->peer), (NULL == record->key) ? "NULL" : record->key); record->client = client; if (GNUNET_OK != db->iterate_records (db->cls, record->sub_system, (ntohs (srm->peer_set)) ? &record->peer : NULL, record->key, &record_iterator, record)) { GNUNET_break (0); GNUNET_SERVICE_client_drop (client); PEERSTORE_destroy_record (record); } } /** * Continuation of store_record called by the peerstore plugin * * @param cls closure * @param success result */ static void store_record_continuation (void *cls, int success) { struct GNUNET_PEERSTORE_Record *record = cls; if (GNUNET_OK == success) { watch_notifier (record); GNUNET_SERVICE_client_continue (record->client); } else { GNUNET_break (0); GNUNET_SERVICE_client_drop (record->client); } PEERSTORE_destroy_record (record); } /** * Check a store request from client * * @param cls client identification of the client * @param srm the actual message * @return #GNUNET_OK if @a srm is well-formed */ static int check_store (void *cls, const struct StoreRecordMessage *srm) { struct GNUNET_PEERSTORE_Record *record; record = PEERSTORE_parse_record_message (srm); if (NULL == record) { GNUNET_break (0); return GNUNET_SYSERR; } if ((NULL == record->sub_system) || (NULL == record->key)) { GNUNET_break (0); PEERSTORE_destroy_record (record); return GNUNET_SYSERR; } PEERSTORE_destroy_record (record); return GNUNET_OK; } /** * Handle a store request from client * * @param cls client identification of the client * @param srm the actual message */ static void handle_store (void *cls, const struct StoreRecordMessage *srm) { struct GNUNET_SERVICE_Client *client = cls; struct GNUNET_PEERSTORE_Record *record; record = PEERSTORE_parse_record_message (srm); GNUNET_log ( GNUNET_ERROR_TYPE_INFO, "Received a store request. Sub system `%s' Peer `%s Key `%s' Options: %u.\n", record->sub_system, GNUNET_i2s (&record->peer), record->key, (uint32_t) ntohl (srm->options)); record->client = client; if (GNUNET_OK != db->store_record (db->cls, record->sub_system, &record->peer, record->key, record->value, record->value_size, record->expiry, ntohl (srm->options), &store_record_continuation, record)) { GNUNET_break (0); PEERSTORE_destroy_record (record); GNUNET_SERVICE_client_drop (client); return; } } /** * Peerstore service runner. * * @param cls closure * @param c configuration to use * @param service the initialized service */ static void run (void *cls, const struct GNUNET_CONFIGURATION_Handle *c, struct GNUNET_SERVICE_Handle *service) { char *database; in_shutdown = GNUNET_NO; num_clients = 0; cfg = c; if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, "peerstore", "DATABASE", &database)) { GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, "peerstore", "DATABASE"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_asprintf (&db_lib_name, "libgnunet_plugin_peerstore_%s", database); db = GNUNET_PLUGIN_load (db_lib_name, (void *) cfg); GNUNET_free (database); if (NULL == db) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, _ ("Could not load database backend `%s'\n"), db_lib_name); GNUNET_SCHEDULER_shutdown (); return; } watchers = GNUNET_CONTAINER_multihashmap_create (10, GNUNET_NO); expire_task = GNUNET_SCHEDULER_add_now (&cleanup_expired_records, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, NULL); } /** * Define "main" method using service macro. */ GNUNET_SERVICE_MAIN ( "peerstore", GNUNET_SERVICE_OPTION_SOFT_SHUTDOWN, &run, &client_connect_cb, &client_disconnect_cb, NULL, GNUNET_MQ_hd_var_size (store, GNUNET_MESSAGE_TYPE_PEERSTORE_STORE, struct StoreRecordMessage, NULL), GNUNET_MQ_hd_var_size (iterate, GNUNET_MESSAGE_TYPE_PEERSTORE_ITERATE, struct StoreRecordMessage, NULL), GNUNET_MQ_hd_fixed_size (watch, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH, struct StoreKeyHashMessage, NULL), GNUNET_MQ_hd_fixed_size (watch_cancel, GNUNET_MESSAGE_TYPE_PEERSTORE_WATCH_CANCEL, struct StoreKeyHashMessage, NULL), GNUNET_MQ_handler_end ()); /* end of gnunet-service-peerstore.c */