summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-08-22 23:59:54 +0200
committerChristian Grothoff <christian@grothoff.org>2021-08-22 23:59:54 +0200
commitea901fb4978ee7e9cfd2f74c810f2146bdf9d46b (patch)
treef2bbad92955ed144bc6afa9a03ece372c20a874a
parent9ef7f0704fa0458f2e27ba188aec5102dbb780b2 (diff)
-simplify libgnunetpq to only support single-threaded applications that do use the scheudler (when using event API)
-rw-r--r--src/include/gnunet_pq_lib.h49
-rw-r--r--src/pq/pq.h40
-rw-r--r--src/pq/pq_connect.c17
-rw-r--r--src/pq/pq_event.c145
-rw-r--r--src/pq/test_pq.c64
-rw-r--r--src/util/os_priority.c4
6 files changed, 101 insertions, 218 deletions
diff --git a/src/include/gnunet_pq_lib.h b/src/include/gnunet_pq_lib.h
index ecc2b9719..ff4498938 100644
--- a/src/include/gnunet_pq_lib.h
+++ b/src/include/gnunet_pq_lib.h
@@ -853,33 +853,6 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db);
/**
- * Function called whenever the socket needed for
- * notifications from postgres changes.
- *
- * @param cls closure
- * @param fd socket to listen on, -1 for none
- */
-typedef void
-(*GNUNET_PQ_SocketCallback)(void *cls,
- int fd);
-
-
-/**
- * Obtain the file descriptor to poll on for notifications.
- * Useful if the GNUnet scheduler is NOT to be used for
- * such notifications.
- *
- * @param db database handle
- * @param sc function to call with the socket
- * @param sc_cls closure for @a sc
- */
-void
-GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
- GNUNET_PQ_SocketCallback sc,
- void *sc_cls);
-
-
-/**
* Poll for database events now. Used if the event FD
* is ready and the application wants to trigger applicable
* events.
@@ -893,24 +866,6 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db);
/**
- * Run poll event loop using the GNUnet scheduler.
- *
- * @param db database handle
- */
-void
-GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db);
-
-
-/**
- * Stop running poll event loop using the GNUnet scheduler.
- *
- * @param db database handle
- */
-void
-GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db);
-
-
-/**
* Register callback to be invoked on events of type @a es.
*
* Unlike many other calls, this function is thread-safe
@@ -921,14 +876,16 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db);
*
* @param db database context to use
* @param es specification of the event to listen for
+ * @param timeout when to trigger @a cb based on timeout
* @param cb function to call when the event happens, possibly
- * multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked)
+ * multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked), including on timeout
* @param cb_cls closure for @a cb
* @return handle useful to cancel the listener
*/
struct GNUNET_DB_EventHandler *
GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
const struct GNUNET_DB_EventHeaderP *es,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DB_EventCallback cb,
void *cb_cls);
diff --git a/src/pq/pq.h b/src/pq/pq.h
index 107fd116c..950d38220 100644
--- a/src/pq/pq.h
+++ b/src/pq/pq.h
@@ -28,6 +28,7 @@
#include "gnunet_util_lib.h"
#include "gnunet_pq_lib.h"
+
/**
* Handle to Postgres database.
*/
@@ -59,26 +60,11 @@ struct GNUNET_PQ_Context
char *load_path;
/**
- * Function to call on Postgres FDs.
- */
- GNUNET_PQ_SocketCallback sc;
-
- /**
- * Closure for @e sc.
- */
- void *sc_cls;
-
- /**
* Map managing event subscriptions.
*/
struct GNUNET_CONTAINER_MultiShortmap *channel_map;
/**
- * Lock to access @e channel_map.
- */
- pthread_mutex_t notify_lock;
-
- /**
* Task responsible for processing events.
*/
struct GNUNET_SCHEDULER_Task *event_task;
@@ -87,7 +73,7 @@ struct GNUNET_PQ_Context
* File descriptor wrapper for @e event_task.
*/
struct GNUNET_NETWORK_Handle *rfd;
-
+
/**
* Is scheduling via the GNUnet scheduler desired?
*/
@@ -100,9 +86,29 @@ struct GNUNET_PQ_Context
* after a disconnect.
*
* @param db the DB handle
+ * @param fd socket to listen on
+ */
+void
+GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
+ int fd);
+
+
+/**
+ * Run poll event loop using the GNUnet scheduler.
+ *
+ * @param db database handle
+ */
+void
+GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db);
+
+
+/**
+ * Stop running poll event loop using the GNUnet scheduler.
+ *
+ * @param db database handle
*/
void
-GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db);
+GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db);
#endif
diff --git a/src/pq/pq_connect.c b/src/pq/pq_connect.c
index 275fd7450..05e787939 100644
--- a/src/pq/pq_connect.c
+++ b/src/pq/pq_connect.c
@@ -103,9 +103,6 @@ GNUNET_PQ_connect (const char *config_str,
}
db->channel_map = GNUNET_CONTAINER_multishortmap_create (16,
GNUNET_YES);
- GNUNET_assert (0 ==
- pthread_mutex_init (&db->notify_lock,
- NULL));
GNUNET_PQ_reconnect (db);
if (NULL == db->conn)
{
@@ -294,9 +291,8 @@ GNUNET_PQ_reconnect_if_down (struct GNUNET_PQ_Context *db)
void
GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db)
{
- if (NULL != db->sc)
- db->sc (db->sc_cls,
- -1);
+ GNUNET_PQ_event_reconnect_ (db,
+ -1);
if (NULL != db->conn)
PQfinish (db->conn);
db->conn = PQconnectdb (db->config_str);
@@ -416,11 +412,8 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db)
db->conn = NULL;
return;
}
- GNUNET_PQ_event_reconnect_ (db);
- if ( (NULL != db->sc) &&
- (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
- db->sc (db->sc_cls,
- PQsocket (db->conn));
+ GNUNET_PQ_event_reconnect_ (db,
+ PQsocket (db->conn));
}
@@ -473,8 +466,6 @@ GNUNET_PQ_disconnect (struct GNUNET_PQ_Context *db)
GNUNET_assert (0 ==
GNUNET_CONTAINER_multishortmap_size (db->channel_map));
GNUNET_CONTAINER_multishortmap_destroy (db->channel_map);
- GNUNET_assert (0 ==
- pthread_mutex_destroy (&db->notify_lock));
GNUNET_free (db->es);
GNUNET_free (db->ps);
GNUNET_free (db->load_path);
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
index 2890869a3..3a0bfcde3 100644
--- a/src/pq/pq_event.c
+++ b/src/pq/pq_event.c
@@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler
* Database context this event handler is with.
*/
struct GNUNET_PQ_Context *db;
+
+ /**
+ * Task to run on timeout.
+ */
+ struct GNUNET_SCHEDULER_Task *timeout_task;
};
@@ -163,35 +168,10 @@ do_notify (void *cls,
void
-GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
- GNUNET_PQ_SocketCallback sc,
- void *sc_cls)
-{
- int fd;
-
- db->sc = sc;
- db->sc_cls = sc_cls;
- if (NULL == sc)
- return;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
- fd = PQsocket (db->conn);
- if ( (-1 != fd) &&
- (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
- sc (sc_cls,
- fd);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
-}
-
-
-void
GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
{
PGnotify *n;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
if (1 !=
PQconsumeInput (db->conn))
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
.extra = NULL
};
+ if ('X' != toupper ((int) n->relname[0]))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Ignoring notification for unsupported channel identifier `%s'\n",
+ n->relname);
+ PQfreemem (n);
+ continue;
+ }
if (GNUNET_OK !=
- GNUNET_STRINGS_string_to_data (n->relname,
- strlen (n->relname),
+ GNUNET_STRINGS_string_to_data (&n->relname[1],
+ strlen (&n->relname[1]),
&sh,
sizeof (sh)))
{
@@ -236,24 +224,10 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
GNUNET_free (ctx.extra);
PQfreemem (n);
}
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
}
/**
- * Function called when the Postgres FD changes and we need
- * to update the scheduler event loop task.
- *
- * @param cls a `struct GNUNET_PQ_Context *`
- * @param fd the file descriptor, possibly -1
- */
-static void
-scheduler_fd_cb (void *cls,
- int fd);
-
-
-/**
* The GNUnet scheduler notifies us that we need to
* trigger the DB event poller.
*
@@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls,
void
-GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db)
{
- int fd;
-
- GNUNET_assert (! db->scheduler_on);
- GNUNET_assert (NULL == db->sc);
+ if (db->scheduler_on)
+ return;
db->scheduler_on = true;
- db->sc = &scheduler_fd_cb;
- db->sc_cls = db;
- fd = PQsocket (db->conn);
scheduler_fd_cb (db,
- fd);
+ PQsocket (db->conn));
}
void
-GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db)
{
GNUNET_assert (db->scheduler_on);
GNUNET_free (db->rfd);
- db->sc = NULL;
db->scheduler_on = false;
if (NULL != db->event_task)
{
@@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db)
}
+/**
+ * Helper function to trigger an SQL @a cmd on @a db
+ *
+ * @param db database to send command to
+ * @param cmd prefix of the command to send
+ * @param eh details about the event
+ */
static void
manage_subscribe (struct GNUNET_PQ_Context *db,
const char *cmd,
@@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db,
cmd);
end = sh_to_channel (&eh->sh,
end);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Executing PQ command `%s'\n",
+ sql);
result = PQexec (db->conn,
sql);
if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -395,21 +373,41 @@ register_notify (void *cls,
void
-GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
+ int fd)
{
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
+ if (! db->scheduler_on)
+ return;
+ scheduler_fd_cb (db,
+ fd);
GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
&register_notify,
db);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
+}
+
+
+/**
+ * Function run on timeout for an event. Triggers
+ * the notification, but does NOT clear the handler.
+ *
+ * @param cls a `struct GNUNET_DB_EventHandler *`
+ */
+static void
+event_timeout (void *cls)
+{
+ struct GNUNET_DB_EventHandler *eh = cls;
+
+ eh->timeout_task = NULL;
+ eh->cb (eh->cb_cls,
+ NULL,
+ 0);
}
struct GNUNET_DB_EventHandler *
GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
const struct GNUNET_DB_EventHeaderP *es,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DB_EventCallback cb,
void *cb_cls)
{
@@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
&eh->sh);
eh->cb = cb;
eh->cb_cls = cb_cls;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multishortmap_put (db->channel_map,
&eh->sh,
eh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- if ( (NULL != db->sc) &&
- was_zero)
- {
- int fd = PQsocket (db->conn);
-
- if (-1 != fd)
- db->sc (db->sc_cls,
- fd);
- }
+ if (was_zero)
+ GNUNET_PQ_event_scheduler_start_ (db);
manage_subscribe (db,
"LISTEN X",
eh);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
+ eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+ &event_timeout,
+ eh);
return eh;
}
@@ -453,8 +443,6 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
{
struct GNUNET_PQ_Context *db = eh->db;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
&eh->sh,
@@ -463,14 +451,15 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
manage_subscribe (db,
"UNLISTEN X",
eh);
- if ( (NULL != db->sc) &&
- (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
+ if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
+ {
+ GNUNET_PQ_event_scheduler_stop_ (db);
+ }
+ if (NULL != eh->timeout_task)
{
- db->sc (db->sc_cls,
- -1);
+ GNUNET_SCHEDULER_cancel (eh->timeout_task);
+ eh->timeout_task = NULL;
}
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
GNUNET_free (eh);
}
diff --git a/src/pq/test_pq.c b/src/pq/test_pq.c
index ffbb4d129..90b5c6489 100644
--- a/src/pq/test_pq.c
+++ b/src/pq/test_pq.c
@@ -240,63 +240,6 @@ run_queries (struct GNUNET_PQ_Context *db)
}
-static void
-event_cb (void *cls,
- const void *extra,
- size_t extra_size)
-{
- unsigned int *cnt = cls;
-
- GNUNET_assert (5 == extra_size);
- GNUNET_assert (0 == memcmp ("world",
- extra,
- 5));
- (*cnt)++;
-}
-
-
-/**
- * Run subscribe/notify tests.
- *
- * @param db database handle
- * @return 0 on success
- */
-static int
-test_notify (struct GNUNET_PQ_Context *db)
-{
- struct GNUNET_DB_EventHeaderP e1 = {
- .size = htons (sizeof (e1)),
- .type = htons (1)
- };
- struct GNUNET_DB_EventHeaderP e2 = {
- .size = htons (sizeof (e2)),
- .type = htons (2)
- };
- unsigned int called = 0;
- struct GNUNET_DB_EventHandler *eh;
-
- eh = GNUNET_PQ_event_listen (db,
- &e1,
- &event_cb,
- &called);
- GNUNET_assert (NULL != eh);
- GNUNET_PQ_event_notify (db,
- &e2,
- "hello",
- 5);
- GNUNET_PQ_event_do_poll (db);
- GNUNET_assert (0 == called);
- GNUNET_PQ_event_notify (db,
- &e1,
- "world",
- 5);
- GNUNET_PQ_event_do_poll (db);
- GNUNET_assert (1 == called);
- GNUNET_PQ_event_listen_cancel (eh);
- return 0;
-}
-
-
/**
* Task called on shutdown.
*
@@ -305,7 +248,6 @@ test_notify (struct GNUNET_PQ_Context *db)
static void
event_end (void *cls)
{
- GNUNET_PQ_event_scheduler_stop (db);
GNUNET_PQ_event_listen_cancel (eh);
eh = NULL;
if (NULL != tt)
@@ -368,9 +310,9 @@ sched_tests (void *cls)
tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
&timeout_cb,
NULL);
- GNUNET_PQ_event_scheduler_start (db);
eh = GNUNET_PQ_event_listen (db,
&es,
+ GNUNET_TIME_UNIT_FOREVER_REL,
&event_sched_cb,
NULL);
GNUNET_PQ_reconnect (db);
@@ -404,7 +346,7 @@ main (int argc,
};
GNUNET_log_setup ("test-pq",
- "WARNING",
+ "INFO",
NULL);
db = GNUNET_PQ_connect ("postgres:///gnunetcheck",
NULL,
@@ -433,8 +375,6 @@ main (int argc,
return 1;
}
ret = run_queries (db);
- ret |= test_notify (db);
- ret |= test_notify (db);
if (0 != ret)
{
GNUNET_break (0);
diff --git a/src/util/os_priority.c b/src/util/os_priority.c
index dc2f0f97e..08320b291 100644
--- a/src/util/os_priority.c
+++ b/src/util/os_priority.c
@@ -47,7 +47,6 @@ struct GNUNET_OS_Process
*/
pid_t pid;
-
/**
* Pipe we use to signal the process.
* NULL if unused, or if process was deemed uncontrollable.
@@ -301,7 +300,8 @@ GNUNET_OS_process_destroy (struct GNUNET_OS_Process *proc)
* @param flags open flags (O_RDONLY, O_WRONLY)
*/
static void
-open_dev_null (int target_fd, int flags)
+open_dev_null (int target_fd,
+ int flags)
{
int fd;