From ea901fb4978ee7e9cfd2f74c810f2146bdf9d46b Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 22 Aug 2021 23:59:54 +0200 Subject: -simplify libgnunetpq to only support single-threaded applications that do use the scheudler (when using event API) --- src/include/gnunet_pq_lib.h | 49 +-------------- src/pq/pq.h | 40 ++++++------ src/pq/pq_connect.c | 17 ++---- src/pq/pq_event.c | 145 ++++++++++++++++++++------------------------ src/pq/test_pq.c | 64 +------------------ src/util/os_priority.c | 4 +- 6 files changed, 101 insertions(+), 218 deletions(-) diff --git a/src/include/gnunet_pq_lib.h b/src/include/gnunet_pq_lib.h index ecc2b9719..ff4498938 100644 --- a/src/include/gnunet_pq_lib.h +++ b/src/include/gnunet_pq_lib.h @@ -852,33 +852,6 @@ void GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db); -/** - * Function called whenever the socket needed for - * notifications from postgres changes. - * - * @param cls closure - * @param fd socket to listen on, -1 for none - */ -typedef void -(*GNUNET_PQ_SocketCallback)(void *cls, - int fd); - - -/** - * Obtain the file descriptor to poll on for notifications. - * Useful if the GNUnet scheduler is NOT to be used for - * such notifications. - * - * @param db database handle - * @param sc function to call with the socket - * @param sc_cls closure for @a sc - */ -void -GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db, - GNUNET_PQ_SocketCallback sc, - void *sc_cls); - - /** * Poll for database events now. Used if the event FD * is ready and the application wants to trigger applicable @@ -892,24 +865,6 @@ void GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db); -/** - * Run poll event loop using the GNUnet scheduler. - * - * @param db database handle - */ -void -GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db); - - -/** - * Stop running poll event loop using the GNUnet scheduler. - * - * @param db database handle - */ -void -GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db); - - /** * Register callback to be invoked on events of type @a es. * @@ -921,14 +876,16 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db); * * @param db database context to use * @param es specification of the event to listen for + * @param timeout when to trigger @a cb based on timeout * @param cb function to call when the event happens, possibly - * multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked) + * multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked), including on timeout * @param cb_cls closure for @a cb * @return handle useful to cancel the listener */ struct GNUNET_DB_EventHandler * GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, + struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls); diff --git a/src/pq/pq.h b/src/pq/pq.h index 107fd116c..950d38220 100644 --- a/src/pq/pq.h +++ b/src/pq/pq.h @@ -28,6 +28,7 @@ #include "gnunet_util_lib.h" #include "gnunet_pq_lib.h" + /** * Handle to Postgres database. */ @@ -58,26 +59,11 @@ struct GNUNET_PQ_Context */ char *load_path; - /** - * Function to call on Postgres FDs. - */ - GNUNET_PQ_SocketCallback sc; - - /** - * Closure for @e sc. - */ - void *sc_cls; - /** * Map managing event subscriptions. */ struct GNUNET_CONTAINER_MultiShortmap *channel_map; - /** - * Lock to access @e channel_map. - */ - pthread_mutex_t notify_lock; - /** * Task responsible for processing events. */ @@ -87,7 +73,7 @@ struct GNUNET_PQ_Context * File descriptor wrapper for @e event_task. */ struct GNUNET_NETWORK_Handle *rfd; - + /** * Is scheduling via the GNUnet scheduler desired? */ @@ -100,9 +86,29 @@ struct GNUNET_PQ_Context * after a disconnect. * * @param db the DB handle + * @param fd socket to listen on + */ +void +GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, + int fd); + + +/** + * Run poll event loop using the GNUnet scheduler. + * + * @param db database handle + */ +void +GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db); + + +/** + * Stop running poll event loop using the GNUnet scheduler. + * + * @param db database handle */ void -GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db); +GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db); #endif diff --git a/src/pq/pq_connect.c b/src/pq/pq_connect.c index 275fd7450..05e787939 100644 --- a/src/pq/pq_connect.c +++ b/src/pq/pq_connect.c @@ -103,9 +103,6 @@ GNUNET_PQ_connect (const char *config_str, } db->channel_map = GNUNET_CONTAINER_multishortmap_create (16, GNUNET_YES); - GNUNET_assert (0 == - pthread_mutex_init (&db->notify_lock, - NULL)); GNUNET_PQ_reconnect (db); if (NULL == db->conn) { @@ -294,9 +291,8 @@ GNUNET_PQ_reconnect_if_down (struct GNUNET_PQ_Context *db) void GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) { - if (NULL != db->sc) - db->sc (db->sc_cls, - -1); + GNUNET_PQ_event_reconnect_ (db, + -1); if (NULL != db->conn) PQfinish (db->conn); db->conn = PQconnectdb (db->config_str); @@ -416,11 +412,8 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) db->conn = NULL; return; } - GNUNET_PQ_event_reconnect_ (db); - if ( (NULL != db->sc) && - (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) - db->sc (db->sc_cls, - PQsocket (db->conn)); + GNUNET_PQ_event_reconnect_ (db, + PQsocket (db->conn)); } @@ -473,8 +466,6 @@ GNUNET_PQ_disconnect (struct GNUNET_PQ_Context *db) GNUNET_assert (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); GNUNET_CONTAINER_multishortmap_destroy (db->channel_map); - GNUNET_assert (0 == - pthread_mutex_destroy (&db->notify_lock)); GNUNET_free (db->es); GNUNET_free (db->ps); GNUNET_free (db->load_path); diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c index 2890869a3..3a0bfcde3 100644 --- a/src/pq/pq_event.c +++ b/src/pq/pq_event.c @@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler * Database context this event handler is with. */ struct GNUNET_PQ_Context *db; + + /** + * Task to run on timeout. + */ + struct GNUNET_SCHEDULER_Task *timeout_task; }; @@ -162,36 +167,11 @@ do_notify (void *cls, } -void -GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db, - GNUNET_PQ_SocketCallback sc, - void *sc_cls) -{ - int fd; - - db->sc = sc; - db->sc_cls = sc_cls; - if (NULL == sc) - return; - GNUNET_assert (0 == - pthread_mutex_lock (&db->notify_lock)); - fd = PQsocket (db->conn); - if ( (-1 != fd) && - (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) - sc (sc_cls, - fd); - GNUNET_assert (0 == - pthread_mutex_unlock (&db->notify_lock)); -} - - void GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) { PGnotify *n; - GNUNET_assert (0 == - pthread_mutex_lock (&db->notify_lock)); if (1 != PQconsumeInput (db->conn)) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) .extra = NULL }; + if ('X' != toupper ((int) n->relname[0])) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Ignoring notification for unsupported channel identifier `%s'\n", + n->relname); + PQfreemem (n); + continue; + } if (GNUNET_OK != - GNUNET_STRINGS_string_to_data (n->relname, - strlen (n->relname), + GNUNET_STRINGS_string_to_data (&n->relname[1], + strlen (&n->relname[1]), &sh, sizeof (sh))) { @@ -236,23 +224,9 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) GNUNET_free (ctx.extra); PQfreemem (n); } - GNUNET_assert (0 == - pthread_mutex_unlock (&db->notify_lock)); } -/** - * Function called when the Postgres FD changes and we need - * to update the scheduler event loop task. - * - * @param cls a `struct GNUNET_PQ_Context *` - * @param fd the file descriptor, possibly -1 - */ -static void -scheduler_fd_cb (void *cls, - int fd); - - /** * The GNUnet scheduler notifies us that we need to * trigger the DB event poller. @@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls, void -GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db) +GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db) { - int fd; - - GNUNET_assert (! db->scheduler_on); - GNUNET_assert (NULL == db->sc); + if (db->scheduler_on) + return; db->scheduler_on = true; - db->sc = &scheduler_fd_cb; - db->sc_cls = db; - fd = PQsocket (db->conn); scheduler_fd_cb (db, - fd); + PQsocket (db->conn)); } void -GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) +GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db) { GNUNET_assert (db->scheduler_on); GNUNET_free (db->rfd); - db->sc = NULL; db->scheduler_on = false; if (NULL != db->event_task) { @@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) } +/** + * Helper function to trigger an SQL @a cmd on @a db + * + * @param db database to send command to + * @param cmd prefix of the command to send + * @param eh details about the event + */ static void manage_subscribe (struct GNUNET_PQ_Context *db, const char *cmd, @@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db, cmd); end = sh_to_channel (&eh->sh, end); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Executing PQ command `%s'\n", + sql); result = PQexec (db->conn, sql); if (PGRES_COMMAND_OK != PQresultStatus (result)) @@ -395,21 +373,41 @@ register_notify (void *cls, void -GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db) +GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, + int fd) { - GNUNET_assert (0 == - pthread_mutex_lock (&db->notify_lock)); + if (! db->scheduler_on) + return; + scheduler_fd_cb (db, + fd); GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, ®ister_notify, db); - GNUNET_assert (0 == - pthread_mutex_unlock (&db->notify_lock)); +} + + +/** + * Function run on timeout for an event. Triggers + * the notification, but does NOT clear the handler. + * + * @param cls a `struct GNUNET_DB_EventHandler *` + */ +static void +event_timeout (void *cls) +{ + struct GNUNET_DB_EventHandler *eh = cls; + + eh->timeout_task = NULL; + eh->cb (eh->cb_cls, + NULL, + 0); } struct GNUNET_DB_EventHandler * GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, const struct GNUNET_DB_EventHeaderP *es, + struct GNUNET_TIME_Relative timeout, GNUNET_DB_EventCallback cb, void *cb_cls) { @@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, &eh->sh); eh->cb = cb; eh->cb_cls = cb_cls; - GNUNET_assert (0 == - pthread_mutex_lock (&db->notify_lock)); was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multishortmap_put (db->channel_map, &eh->sh, eh, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - if ( (NULL != db->sc) && - was_zero) - { - int fd = PQsocket (db->conn); - - if (-1 != fd) - db->sc (db->sc_cls, - fd); - } + if (was_zero) + GNUNET_PQ_event_scheduler_start_ (db); manage_subscribe (db, "LISTEN X", eh); - GNUNET_assert (0 == - pthread_mutex_unlock (&db->notify_lock)); + eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, + &event_timeout, + eh); return eh; } @@ -453,8 +443,6 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) { struct GNUNET_PQ_Context *db = eh->db; - GNUNET_assert (0 == - pthread_mutex_lock (&db->notify_lock)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multishortmap_remove (db->channel_map, &eh->sh, @@ -463,14 +451,15 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) manage_subscribe (db, "UNLISTEN X", eh); - if ( (NULL != db->sc) && - (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) + if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) + { + GNUNET_PQ_event_scheduler_stop_ (db); + } + if (NULL != eh->timeout_task) { - db->sc (db->sc_cls, - -1); + GNUNET_SCHEDULER_cancel (eh->timeout_task); + eh->timeout_task = NULL; } - GNUNET_assert (0 == - pthread_mutex_unlock (&db->notify_lock)); GNUNET_free (eh); } diff --git a/src/pq/test_pq.c b/src/pq/test_pq.c index ffbb4d129..90b5c6489 100644 --- a/src/pq/test_pq.c +++ b/src/pq/test_pq.c @@ -240,63 +240,6 @@ run_queries (struct GNUNET_PQ_Context *db) } -static void -event_cb (void *cls, - const void *extra, - size_t extra_size) -{ - unsigned int *cnt = cls; - - GNUNET_assert (5 == extra_size); - GNUNET_assert (0 == memcmp ("world", - extra, - 5)); - (*cnt)++; -} - - -/** - * Run subscribe/notify tests. - * - * @param db database handle - * @return 0 on success - */ -static int -test_notify (struct GNUNET_PQ_Context *db) -{ - struct GNUNET_DB_EventHeaderP e1 = { - .size = htons (sizeof (e1)), - .type = htons (1) - }; - struct GNUNET_DB_EventHeaderP e2 = { - .size = htons (sizeof (e2)), - .type = htons (2) - }; - unsigned int called = 0; - struct GNUNET_DB_EventHandler *eh; - - eh = GNUNET_PQ_event_listen (db, - &e1, - &event_cb, - &called); - GNUNET_assert (NULL != eh); - GNUNET_PQ_event_notify (db, - &e2, - "hello", - 5); - GNUNET_PQ_event_do_poll (db); - GNUNET_assert (0 == called); - GNUNET_PQ_event_notify (db, - &e1, - "world", - 5); - GNUNET_PQ_event_do_poll (db); - GNUNET_assert (1 == called); - GNUNET_PQ_event_listen_cancel (eh); - return 0; -} - - /** * Task called on shutdown. * @@ -305,7 +248,6 @@ test_notify (struct GNUNET_PQ_Context *db) static void event_end (void *cls) { - GNUNET_PQ_event_scheduler_stop (db); GNUNET_PQ_event_listen_cancel (eh); eh = NULL; if (NULL != tt) @@ -368,9 +310,9 @@ sched_tests (void *cls) tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, &timeout_cb, NULL); - GNUNET_PQ_event_scheduler_start (db); eh = GNUNET_PQ_event_listen (db, &es, + GNUNET_TIME_UNIT_FOREVER_REL, &event_sched_cb, NULL); GNUNET_PQ_reconnect (db); @@ -404,7 +346,7 @@ main (int argc, }; GNUNET_log_setup ("test-pq", - "WARNING", + "INFO", NULL); db = GNUNET_PQ_connect ("postgres:///gnunetcheck", NULL, @@ -433,8 +375,6 @@ main (int argc, return 1; } ret = run_queries (db); - ret |= test_notify (db); - ret |= test_notify (db); if (0 != ret) { GNUNET_break (0); diff --git a/src/util/os_priority.c b/src/util/os_priority.c index dc2f0f97e..08320b291 100644 --- a/src/util/os_priority.c +++ b/src/util/os_priority.c @@ -47,7 +47,6 @@ struct GNUNET_OS_Process */ pid_t pid; - /** * Pipe we use to signal the process. * NULL if unused, or if process was deemed uncontrollable. @@ -301,7 +300,8 @@ GNUNET_OS_process_destroy (struct GNUNET_OS_Process *proc) * @param flags open flags (O_RDONLY, O_WRONLY) */ static void -open_dev_null (int target_fd, int flags) +open_dev_null (int target_fd, + int flags) { int fd; -- cgit v1.2.3