summaryrefslogtreecommitdiff
path: root/src/pq/pq_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r--src/pq/pq_event.c145
1 files changed, 67 insertions, 78 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
index 2890869a3..3a0bfcde3 100644
--- a/src/pq/pq_event.c
+++ b/src/pq/pq_event.c
@@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler
* Database context this event handler is with.
*/
struct GNUNET_PQ_Context *db;
+
+ /**
+ * Task to run on timeout.
+ */
+ struct GNUNET_SCHEDULER_Task *timeout_task;
};
@@ -163,35 +168,10 @@ do_notify (void *cls,
void
-GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
- GNUNET_PQ_SocketCallback sc,
- void *sc_cls)
-{
- int fd;
-
- db->sc = sc;
- db->sc_cls = sc_cls;
- if (NULL == sc)
- return;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
- fd = PQsocket (db->conn);
- if ( (-1 != fd) &&
- (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
- sc (sc_cls,
- fd);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
-}
-
-
-void
GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
{
PGnotify *n;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
if (1 !=
PQconsumeInput (db->conn))
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
.extra = NULL
};
+ if ('X' != toupper ((int) n->relname[0]))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Ignoring notification for unsupported channel identifier `%s'\n",
+ n->relname);
+ PQfreemem (n);
+ continue;
+ }
if (GNUNET_OK !=
- GNUNET_STRINGS_string_to_data (n->relname,
- strlen (n->relname),
+ GNUNET_STRINGS_string_to_data (&n->relname[1],
+ strlen (&n->relname[1]),
&sh,
sizeof (sh)))
{
@@ -236,24 +224,10 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
GNUNET_free (ctx.extra);
PQfreemem (n);
}
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
}
/**
- * Function called when the Postgres FD changes and we need
- * to update the scheduler event loop task.
- *
- * @param cls a `struct GNUNET_PQ_Context *`
- * @param fd the file descriptor, possibly -1
- */
-static void
-scheduler_fd_cb (void *cls,
- int fd);
-
-
-/**
* The GNUnet scheduler notifies us that we need to
* trigger the DB event poller.
*
@@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls,
void
-GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db)
{
- int fd;
-
- GNUNET_assert (! db->scheduler_on);
- GNUNET_assert (NULL == db->sc);
+ if (db->scheduler_on)
+ return;
db->scheduler_on = true;
- db->sc = &scheduler_fd_cb;
- db->sc_cls = db;
- fd = PQsocket (db->conn);
scheduler_fd_cb (db,
- fd);
+ PQsocket (db->conn));
}
void
-GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db)
{
GNUNET_assert (db->scheduler_on);
GNUNET_free (db->rfd);
- db->sc = NULL;
db->scheduler_on = false;
if (NULL != db->event_task)
{
@@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db)
}
+/**
+ * Helper function to trigger an SQL @a cmd on @a db
+ *
+ * @param db database to send command to
+ * @param cmd prefix of the command to send
+ * @param eh details about the event
+ */
static void
manage_subscribe (struct GNUNET_PQ_Context *db,
const char *cmd,
@@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db,
cmd);
end = sh_to_channel (&eh->sh,
end);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Executing PQ command `%s'\n",
+ sql);
result = PQexec (db->conn,
sql);
if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -395,21 +373,41 @@ register_notify (void *cls,
void
-GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db)
+GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
+ int fd)
{
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
+ if (! db->scheduler_on)
+ return;
+ scheduler_fd_cb (db,
+ fd);
GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
&register_notify,
db);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
+}
+
+
+/**
+ * Function run on timeout for an event. Triggers
+ * the notification, but does NOT clear the handler.
+ *
+ * @param cls a `struct GNUNET_DB_EventHandler *`
+ */
+static void
+event_timeout (void *cls)
+{
+ struct GNUNET_DB_EventHandler *eh = cls;
+
+ eh->timeout_task = NULL;
+ eh->cb (eh->cb_cls,
+ NULL,
+ 0);
}
struct GNUNET_DB_EventHandler *
GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
const struct GNUNET_DB_EventHeaderP *es,
+ struct GNUNET_TIME_Relative timeout,
GNUNET_DB_EventCallback cb,
void *cb_cls)
{
@@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
&eh->sh);
eh->cb = cb;
eh->cb_cls = cb_cls;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multishortmap_put (db->channel_map,
&eh->sh,
eh,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
- if ( (NULL != db->sc) &&
- was_zero)
- {
- int fd = PQsocket (db->conn);
-
- if (-1 != fd)
- db->sc (db->sc_cls,
- fd);
- }
+ if (was_zero)
+ GNUNET_PQ_event_scheduler_start_ (db);
manage_subscribe (db,
"LISTEN X",
eh);
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
+ eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
+ &event_timeout,
+ eh);
return eh;
}
@@ -453,8 +443,6 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
{
struct GNUNET_PQ_Context *db = eh->db;
- GNUNET_assert (0 ==
- pthread_mutex_lock (&db->notify_lock));
GNUNET_assert (GNUNET_OK ==
GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
&eh->sh,
@@ -463,14 +451,15 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
manage_subscribe (db,
"UNLISTEN X",
eh);
- if ( (NULL != db->sc) &&
- (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
+ if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
+ {
+ GNUNET_PQ_event_scheduler_stop_ (db);
+ }
+ if (NULL != eh->timeout_task)
{
- db->sc (db->sc_cls,
- -1);
+ GNUNET_SCHEDULER_cancel (eh->timeout_task);
+ eh->timeout_task = NULL;
}
- GNUNET_assert (0 ==
- pthread_mutex_unlock (&db->notify_lock));
GNUNET_free (eh);
}