aboutsummaryrefslogtreecommitdiff
path: root/src/pq/pq_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r--src/pq/pq_event.c396
1 files changed, 396 insertions, 0 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
new file mode 100644
index 000000000..ecb942230
--- /dev/null
+++ b/src/pq/pq_event.c
@@ -0,0 +1,396 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2021 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file pq/pq_event.c
22 * @brief event notifications via Postgres
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "pq.h"
27#include <pthread.h>
28
29
30/**
31 * Handle for an active LISTENer to the database.
32 */
33struct GNUNET_PQ_EventHandler
34{
35 /**
36 * Channel name.
37 */
38 struct GNUNET_ShortHashCode sh;
39
40 /**
41 * Function to call on events.
42 */
43 GNUNET_PQ_EventCallback cb;
44
45 /**
46 * Closure for @e cb.
47 */
48 void *cb_cls;
49
50 /**
51 * Database context this event handler is with.
52 */
53 struct GNUNET_PQ_Context *db;
54};
55
56
57
58
59/**
60 * Convert @a es to a short hash.
61 *
62 * @param es spec to hash to an identifier
63 * @param[out] sh short hash to set
64 */
65static void
66es_to_sh (const struct GNUNET_PQ_EventHeaderP *es,
67 struct GNUNET_ShortHashCode *sh)
68{
69 struct GNUNET_HashCode h_channel;
70
71 GNUNET_CRYPTO_hash (es,
72 ntohs (es->size),
73 &h_channel);
74 GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
75 memcpy (sh,
76 &h_channel,
77 sizeof (*sh));
78}
79
80
81/**
82 * Convert @a sh to a Postgres identifier.
83 *
84 * @param sh short hash to convert to an identifier
85 * @param[out] identifier by default, Postgres supports
86 * NAMEDATALEN=64 character identifiers
87 * @return end position of the identifier
88 */
89static char *
90sh_to_channel (struct GNUNET_ShortHashCode *sh,
91 char identifier[64])
92{
93 char *end;
94
95 end = GNUNET_STRINGS_data_to_string (sh,
96 sizeof (*sh),
97 identifier,
98 63);
99 GNUNET_assert (NULL != end);
100 *end = '\0';
101 return end;
102}
103
104
105/**
106 * Convert @a es to a Postgres identifier.
107 *
108 * @param es spec to hash to an identifier
109 * @param[out] identifier by default, Postgres supports
110 * NAMEDATALEN=64 character identifiers
111 * @return end position of the identifier
112 */
113static char *
114es_to_channel (const struct GNUNET_PQ_EventHeaderP *es,
115 char identifier[64])
116{
117 struct GNUNET_ShortHashCode sh;
118
119 es_to_sh (es,
120 &sh);
121 return sh_to_channel (&sh,
122 identifier);
123}
124
125
126/**
127 * Closure for #do_notify().
128 */
129struct NotifyContext
130{
131 /**
132 * Extra argument of the notification, or NULL.
133 */
134 void *extra;
135
136 /**
137 * Number of bytes in @e extra.
138 */
139 size_t extra_size;
140};
141
142
143/**
144 * Function called on every event handler that
145 * needs to be triggered.
146 *
147 * @param cls a `struct NotifyContext`
148 * @param sh channel name
149 * @param value a `struct GNUNET_PQ_EventHandler`
150 * @return #GNUNET_OK continue to iterate
151 */
152static int
153do_notify (void *cls,
154 const struct GNUNET_ShortHashCode *sh,
155 void *value)
156{
157 struct NotifyContext *ctx = cls;
158 struct GNUNET_PQ_EventHandler *eh = value;
159
160 eh->cb (eh->cb_cls,
161 ctx->extra,
162 ctx->extra_size);
163 return GNUNET_OK;
164}
165
166
167void
168GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
169 GNUNET_PQ_SocketCallback sc,
170 void *sc_cls)
171{
172 int fd;
173
174 db->sc = sc;
175 db->sc_cls = sc_cls;
176 if (NULL == sc)
177 return;
178 GNUNET_assert (0 ==
179 pthread_mutex_lock (&db->notify_lock));
180 fd = PQsocket (db->conn);
181 if ( (-1 != fd) &&
182 (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
183 sc (sc_cls,
184 fd);
185 GNUNET_assert (0 ==
186 pthread_mutex_unlock (&db->notify_lock));
187}
188
189
190void
191GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
192{
193 PGnotify *n;
194
195 GNUNET_assert (0 ==
196 pthread_mutex_lock (&db->notify_lock));
197 while (NULL != (n = PQnotifies (db->conn)))
198 {
199 struct GNUNET_ShortHashCode sh;
200 struct NotifyContext ctx = {
201 .extra = NULL
202 };
203
204 if (GNUNET_OK !=
205 GNUNET_STRINGS_string_to_data (n->relname,
206 strlen (n->relname),
207 &sh,
208 sizeof (sh)))
209 {
210 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
211 "Ignoring notification for unsupported channel identifier `%s'\n",
212 n->relname);
213 continue;
214 }
215 if ( (NULL != n->extra) &&
216 (GNUNET_OK !=
217 GNUNET_STRINGS_string_to_data_alloc (n->extra,
218 strlen (n->extra),
219 &ctx.extra,
220 &ctx.extra_size)))
221 {
222 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
223 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
224 n->extra,
225 n->relname);
226 continue;
227 }
228 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
229 &do_notify,
230 &ctx);
231 GNUNET_free (ctx.extra);
232 }
233 GNUNET_assert (0 ==
234 pthread_mutex_unlock (&db->notify_lock));
235}
236
237
238void
239GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db)
240{
241 GNUNET_break (0); // FIXME: not implemented
242}
243
244
245void
246GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db)
247{
248 GNUNET_break (0); // FIXME: not implemented
249}
250
251
252static void
253manage_subscribe (struct GNUNET_PQ_Context *db,
254 const char *cmd,
255 struct GNUNET_PQ_EventHandler *eh)
256{
257 char sql[16 + 64];
258 char *end;
259 PGresult *result;
260
261 end = stpcpy (sql,
262 cmd);
263 end = sh_to_channel (&eh->sh,
264 end);
265 result = PQexec (db->conn,
266 sql);
267 if (PGRES_COMMAND_OK != PQresultStatus (result))
268 {
269 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
270 "pq",
271 "Failed to execute `%s': %s/%s/%s/%s/%s",
272 sql,
273 PQresultErrorField (result,
274 PG_DIAG_MESSAGE_PRIMARY),
275 PQresultErrorField (result,
276 PG_DIAG_MESSAGE_DETAIL),
277 PQresultErrorMessage (result),
278 PQresStatus (PQresultStatus (result)),
279 PQerrorMessage (db->conn));
280 }
281 PQclear (result);
282}
283
284
285struct GNUNET_PQ_EventHandler *
286GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
287 const struct GNUNET_PQ_EventHeaderP *es,
288 GNUNET_PQ_EventCallback cb,
289 void *cb_cls)
290{
291 struct GNUNET_PQ_EventHandler *eh;
292 bool was_zero;
293
294 eh = GNUNET_new (struct GNUNET_PQ_EventHandler);
295 eh->db = db;
296 es_to_sh (es,
297 &eh->sh);
298 eh->cb = cb;
299 eh->cb_cls = cb_cls;
300 GNUNET_assert (0 ==
301 pthread_mutex_lock (&db->notify_lock));
302 was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
303 GNUNET_assert (GNUNET_OK ==
304 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
305 &eh->sh,
306 eh,
307 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
308 if ( (NULL != db->sc) &&
309 was_zero)
310 {
311 int fd = PQsocket (db->conn);
312
313 if (-1 != fd)
314 db->sc (db->sc_cls,
315 fd);
316 }
317 manage_subscribe (db,
318 "LISTEN ",
319 eh);
320 GNUNET_assert (0 ==
321 pthread_mutex_unlock (&db->notify_lock));
322 return eh;
323}
324
325
326void
327GNUNET_PQ_event_listen_cancel (struct GNUNET_PQ_EventHandler *eh)
328{
329 struct GNUNET_PQ_Context *db = eh->db;
330
331 GNUNET_assert (0 ==
332 pthread_mutex_lock (&db->notify_lock));
333 GNUNET_assert (GNUNET_OK ==
334 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
335 &eh->sh,
336 eh));
337
338 manage_subscribe (db,
339 "UNLISTEN ",
340 eh);
341 if ( (NULL != db->sc) &&
342 (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
343 {
344 db->sc (db->sc_cls,
345 -1);
346 }
347 GNUNET_assert (0 ==
348 pthread_mutex_unlock (&db->notify_lock));
349}
350
351
352void
353GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
354 const struct GNUNET_PQ_EventHeaderP *es,
355 const void *extra,
356 size_t extra_size)
357{
358 char sql[16 + 64 + extra_size * 8 / 5 + 8];
359 char *end;
360 PGresult *result;
361
362 end = stpcpy (sql,
363 "NOTIFY ");
364 end = es_to_channel (es,
365 end);
366 end = stpcpy (end,
367 "'");
368 end = GNUNET_STRINGS_data_to_string (extra,
369 extra_size,
370 end,
371 sizeof (sql) - (end - sql) - 1);
372 GNUNET_assert (NULL != end);
373 *end = '\0';
374 end = stpcpy (end,
375 "'");
376 result = PQexec (db->conn,
377 sql);
378 if (PGRES_COMMAND_OK != PQresultStatus (result))
379 {
380 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
381 "pq",
382 "Failed to execute `%s': %s/%s/%s/%s/%s",
383 sql,
384 PQresultErrorField (result,
385 PG_DIAG_MESSAGE_PRIMARY),
386 PQresultErrorField (result,
387 PG_DIAG_MESSAGE_DETAIL),
388 PQresultErrorMessage (result),
389 PQresStatus (PQresultStatus (result)),
390 PQerrorMessage (db->conn));
391 }
392 PQclear (result);
393}
394
395/* end of pq_event.c */
396