From 45532e7bc19a2a73ad6da6502969c536d6831436 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 25 Aug 2021 12:52:12 +0200 Subject: further simplify libgnunetpq event API and implementation, also trigger events on 'loopback' that are not socket-activated --- src/pq/pq_event.c | 109 ++++++++++++++++++++++++++++++++---------------------- 1 file changed, 65 insertions(+), 44 deletions(-) (limited to 'src/pq/pq_event.c') diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c index 3a0bfcde3..79a2e80c6 100644 --- a/src/pq/pq_event.c +++ b/src/pq/pq_event.c @@ -105,6 +105,24 @@ sh_to_channel (struct GNUNET_ShortHashCode *sh, } +/** + * Convert @a sh to a Postgres identifier. + * + * @param identifier to convert + * @param[out] sh set to short hash + * @return #GNUNET_OK on success + */ +static enum GNUNET_GenericReturnValue +channel_to_sh (const char *identifier, + struct GNUNET_ShortHashCode *sh) +{ + return GNUNET_STRINGS_string_to_data (identifier, + strlen (identifier), + sh, + sizeof (*sh)); +} + + /** * Convert @a es to a Postgres identifier. * @@ -167,11 +185,13 @@ do_notify (void *cls, } -void -GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) +static void +event_do_poll (struct GNUNET_PQ_Context *db) { PGnotify *n; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "PG poll job active\n"); if (1 != PQconsumeInput (db->conn)) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -193,10 +213,8 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) continue; } if (GNUNET_OK != - GNUNET_STRINGS_string_to_data (&n->relname[1], - strlen (&n->relname[1]), - &sh, - sizeof (sh))) + channel_to_sh (&n->relname[1], + &sh)) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Ignoring notification for unsupported channel identifier `%s'\n", @@ -218,9 +236,15 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) PQfreemem (n); continue; } - GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, - &do_notify, - &ctx); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Received notification %s with extra data `%.*s'\n", + n->relname, + (int) ctx.extra_size, + (const char *) ctx.extra); + GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map, + &sh, + &do_notify, + &ctx); GNUNET_free (ctx.extra); PQfreemem (n); } @@ -238,9 +262,11 @@ do_scheduler_notify (void *cls) { struct GNUNET_PQ_Context *db = cls; - GNUNET_assert (db->scheduler_on); + db->event_task = NULL; GNUNET_assert (NULL != db->rfd); - GNUNET_PQ_event_do_poll (db); + event_do_poll (db); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Resubscribing\n"); db->event_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, db->rfd, @@ -262,6 +288,9 @@ scheduler_fd_cb (void *cls, { struct GNUNET_PQ_Context *db = cls; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "New poll FD is %d\n", + fd); if (NULL != db->event_task) { GNUNET_SCHEDULER_cancel (db->event_task); @@ -272,6 +301,9 @@ scheduler_fd_cb (void *cls, return; if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Activating poll job on %d\n", + fd); db->rfd = GNUNET_NETWORK_socket_box_native (fd); db->event_task = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO, @@ -281,31 +313,6 @@ scheduler_fd_cb (void *cls, } -void -GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db) -{ - if (db->scheduler_on) - return; - db->scheduler_on = true; - scheduler_fd_cb (db, - PQsocket (db->conn)); -} - - -void -GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db) -{ - GNUNET_assert (db->scheduler_on); - GNUNET_free (db->rfd); - db->scheduler_on = false; - if (NULL != db->event_task) - { - GNUNET_SCHEDULER_cancel (db->event_task); - db->event_task = NULL; - } -} - - /** * Helper function to trigger an SQL @a cmd on @a db * @@ -376,8 +383,9 @@ void GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, int fd) { - if (! db->scheduler_on) - return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Change in PQ event FD to %d\n", + fd); scheduler_fd_cb (db, fd); GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, @@ -412,7 +420,6 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, void *cb_cls) { struct GNUNET_DB_EventHandler *eh; - bool was_zero; eh = GNUNET_new (struct GNUNET_DB_EventHandler); eh->db = db; @@ -420,14 +427,18 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, &eh->sh); eh->cb = cb; eh->cb_cls = cb_cls; - was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); GNUNET_assert (GNUNET_OK == GNUNET_CONTAINER_multishortmap_put (db->channel_map, &eh->sh, eh, GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - if (was_zero) - GNUNET_PQ_event_scheduler_start_ (db); + if (NULL == db->event_task) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting event scheduler\n"); + scheduler_fd_cb (db, + PQsocket (db->conn)); + } manage_subscribe (db, "LISTEN X", eh); @@ -447,13 +458,19 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) GNUNET_CONTAINER_multishortmap_remove (db->channel_map, &eh->sh, eh)); - manage_subscribe (db, "UNLISTEN X", eh); if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) { - GNUNET_PQ_event_scheduler_stop_ (db); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Stopping PQ event scheduler job\n"); + GNUNET_free (db->rfd); + if (NULL != db->event_task) + { + GNUNET_SCHEDULER_cancel (db->event_task); + db->event_task = NULL; + } } if (NULL != eh->timeout_task) { @@ -488,6 +505,9 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, *end = '\0'; end = stpcpy (end, "'"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Executing command `%s'\n", + sql); result = PQexec (db->conn, sql); if (PGRES_COMMAND_OK != PQresultStatus (result)) @@ -505,6 +525,7 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, PQerrorMessage (db->conn)); } PQclear (result); + event_do_poll (db); } -- cgit v1.2.3