summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/include/gnunet_pq_lib.h13
-rw-r--r--src/pq/pq.h23
-rw-r--r--src/pq/pq_event.c109
3 files changed, 65 insertions, 80 deletions
diff --git a/src/include/gnunet_pq_lib.h b/src/include/gnunet_pq_lib.h
index ff4498938..fe3fabbea 100644
--- a/src/include/gnunet_pq_lib.h
+++ b/src/include/gnunet_pq_lib.h
@@ -853,19 +853,6 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db);
/**
- * Poll for database events now. Used if the event FD
- * is ready and the application wants to trigger applicable
- * events.
- * Useful if the GNUnet scheduler is NOT to be used for
- * such notifications.
- *
- * @param db database handle
- */
-void
-GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db);
-
-
-/**
* Register callback to be invoked on events of type @a es.
*
* Unlike many other calls, this function is thread-safe
diff --git a/src/pq/pq.h b/src/pq/pq.h
index 950d38220..d10931d99 100644
--- a/src/pq/pq.h
+++ b/src/pq/pq.h
@@ -73,11 +73,6 @@ struct GNUNET_PQ_Context
* File descriptor wrapper for @e event_task.
*/
struct GNUNET_NETWORK_Handle *rfd;
-
- /**
- * Is scheduling via the GNUnet scheduler desired?
- */
- bool scheduler_on;
};
@@ -93,22 +88,4 @@ GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
int fd);
-/**
- * Run poll event loop using the GNUnet scheduler.
- *
- * @param db database handle
- */
-void
-GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db);
-
-
-/**
- * Stop running poll event loop using the GNUnet scheduler.
- *
- * @param db database handle
- */
-void
-GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db);
-
-
#endif
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
index 3a0bfcde3..79a2e80c6 100644
--- a/src/pq/pq_event.c
+++ b/src/pq/pq_event.c
@@ -106,6 +106,24 @@ sh_to_channel (struct GNUNET_ShortHashCode *sh,
/**
+ * Convert @a sh to a Postgres identifier.
+ *
+ * @param identifier to convert
+ * @param[out] sh set to short hash
+ * @return #GNUNET_OK on success
+ */
+static enum GNUNET_GenericReturnValue
+channel_to_sh (const char *identifier,
+ struct GNUNET_ShortHashCode *sh)
+{
+ return GNUNET_STRINGS_string_to_data (identifier,
+ strlen (identifier),
+ sh,
+ sizeof (*sh));
+}
+
+
+/**
* Convert @a es to a Postgres identifier.
*
* @param es spec to hash to an identifier
@@ -167,11 +185,13 @@ do_notify (void *cls,
}
-void
-GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
+static void
+event_do_poll (struct GNUNET_PQ_Context *db)
{
PGnotify *n;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "PG poll job active\n");
if (1 !=
PQconsumeInput (db->conn))
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -193,10 +213,8 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
continue;
}
if (GNUNET_OK !=
- GNUNET_STRINGS_string_to_data (&n->relname[1],
- strlen (&n->relname[1]),
- &sh,
- sizeof (sh)))
+ channel_to_sh (&n->relname[1],
+ &sh))
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Ignoring notification for unsupported channel identifier `%s'\n",
@@ -218,9 +236,15 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
PQfreemem (n);
continue;
}
- GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
- &do_notify,
- &ctx);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Received notification %s with extra data `%.*s'\n",
+ n->relname,
+ (int) ctx.extra_size,
+ (const char *) ctx.extra);
+ GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map,
+ &sh,
+ &do_notify,
+ &ctx);
GNUNET_free (ctx.extra);
PQfreemem (n);
}
@@ -238,9 +262,11 @@ do_scheduler_notify (void *cls)
{
struct GNUNET_PQ_Context *db = cls;
- GNUNET_assert (db->scheduler_on);
+ db->event_task = NULL;
GNUNET_assert (NULL != db->rfd);
- GNUNET_PQ_event_do_poll (db);
+ event_do_poll (db);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Resubscribing\n");
db->event_task
= GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
db->rfd,
@@ -262,6 +288,9 @@ scheduler_fd_cb (void *cls,
{
struct GNUNET_PQ_Context *db = cls;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "New poll FD is %d\n",
+ fd);
if (NULL != db->event_task)
{
GNUNET_SCHEDULER_cancel (db->event_task);
@@ -272,6 +301,9 @@ scheduler_fd_cb (void *cls,
return;
if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
return;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Activating poll job on %d\n",
+ fd);
db->rfd = GNUNET_NETWORK_socket_box_native (fd);
db->event_task
= GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO,
@@ -281,31 +313,6 @@ scheduler_fd_cb (void *cls,
}
-void
-GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db)
-{
- if (db->scheduler_on)
- return;
- db->scheduler_on = true;
- scheduler_fd_cb (db,
- PQsocket (db->conn));
-}
-
-
-void
-GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db)
-{
- GNUNET_assert (db->scheduler_on);
- GNUNET_free (db->rfd);
- db->scheduler_on = false;
- if (NULL != db->event_task)
- {
- GNUNET_SCHEDULER_cancel (db->event_task);
- db->event_task = NULL;
- }
-}
-
-
/**
* Helper function to trigger an SQL @a cmd on @a db
*
@@ -376,8 +383,9 @@ void
GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
int fd)
{
- if (! db->scheduler_on)
- return;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Change in PQ event FD to %d\n",
+ fd);
scheduler_fd_cb (db,
fd);
GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
@@ -412,7 +420,6 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
void *cb_cls)
{
struct GNUNET_DB_EventHandler *eh;
- bool was_zero;
eh = GNUNET_new (struct GNUNET_DB_EventHandler);
eh->db = db;
@@ -420,14 +427,18 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
&eh->sh);
eh->cb = cb;
eh->cb_cls = cb_cls;
- was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multishortmap_put (db->channel_map,
&eh->sh,
eh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- if (was_zero)
- GNUNET_PQ_event_scheduler_start_ (db);
+ if (NULL == db->event_task)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting event scheduler\n");
+ scheduler_fd_cb (db,
+ PQsocket (db->conn));
+ }
manage_subscribe (db,
"LISTEN X",
eh);
@@ -447,13 +458,19 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
&eh->sh,
eh));
-
manage_subscribe (db,
"UNLISTEN X",
eh);
if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
{
- GNUNET_PQ_event_scheduler_stop_ (db);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Stopping PQ event scheduler job\n");
+ GNUNET_free (db->rfd);
+ if (NULL != db->event_task)
+ {
+ GNUNET_SCHEDULER_cancel (db->event_task);
+ db->event_task = NULL;
+ }
}
if (NULL != eh->timeout_task)
{
@@ -488,6 +505,9 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
*end = '\0';
end = stpcpy (end,
"'");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Executing command `%s'\n",
+ sql);
result = PQexec (db->conn,
sql);
if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -505,6 +525,7 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
PQerrorMessage (db->conn));
}
PQclear (result);
+ event_do_poll (db);
}