aboutsummaryrefslogtreecommitdiff
path: root/src/lib/pq/pq_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib/pq/pq_event.c')
-rw-r--r--src/lib/pq/pq_event.c584
1 files changed, 584 insertions, 0 deletions
diff --git a/src/lib/pq/pq_event.c b/src/lib/pq/pq_event.c
new file mode 100644
index 000000000..fff9e16f7
--- /dev/null
+++ b/src/lib/pq/pq_event.c
@@ -0,0 +1,584 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2021, 2023 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file pq/pq_event.c
22 * @brief event notifications via Postgres
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "pq.h"
27#include <pthread.h>
28
29
30/**
31 * Handle for an active LISTENer to the database.
32 */
33struct GNUNET_DB_EventHandler
34{
35 /**
36 * Channel name.
37 */
38 struct GNUNET_ShortHashCode sh;
39
40 /**
41 * Function to call on events.
42 */
43 GNUNET_DB_EventCallback cb;
44
45 /**
46 * Closure for @e cb.
47 */
48 void *cb_cls;
49
50 /**
51 * Database context this event handler is with.
52 */
53 struct GNUNET_PQ_Context *db;
54
55 /**
56 * Task to run on timeout.
57 */
58 struct GNUNET_SCHEDULER_Task *timeout_task;
59};
60
61
62/**
63 * Convert @a es to a short hash.
64 *
65 * @param es spec to hash to an identifier
66 * @param[out] sh short hash to set
67 */
68static void
69es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70 struct GNUNET_ShortHashCode *sh)
71{
72 struct GNUNET_HashCode h_channel;
73
74 GNUNET_CRYPTO_hash (es,
75 ntohs (es->size),
76 &h_channel);
77 GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78 memcpy (sh,
79 &h_channel,
80 sizeof (*sh));
81}
82
83
84/**
85 * Convert @a sh to a Postgres identifier.
86 *
87 * @param sh short hash to convert to an identifier
88 * @param[out] identifier by default, Postgres supports
89 * NAMEDATALEN=64 character identifiers
90 * @return end position of the identifier
91 */
92static char *
93sh_to_channel (struct GNUNET_ShortHashCode *sh,
94 char identifier[64])
95{
96 char *end;
97
98 end = GNUNET_STRINGS_data_to_string (sh,
99 sizeof (*sh),
100 identifier,
101 63);
102 GNUNET_assert (NULL != end);
103 *end = '\0';
104 return end;
105}
106
107
108/**
109 * Convert @a sh to a Postgres identifier.
110 *
111 * @param identifier to convert
112 * @param[out] sh set to short hash
113 * @return #GNUNET_OK on success
114 */
115static enum GNUNET_GenericReturnValue
116channel_to_sh (const char *identifier,
117 struct GNUNET_ShortHashCode *sh)
118{
119 return GNUNET_STRINGS_string_to_data (identifier,
120 strlen (identifier),
121 sh,
122 sizeof (*sh));
123}
124
125
126/**
127 * Convert @a es to a Postgres identifier.
128 *
129 * @param es spec to hash to an identifier
130 * @param[out] identifier by default, Postgres supports
131 * NAMEDATALEN=64 character identifiers
132 * @return end position of the identifier
133 */
134static char *
135es_to_channel (const struct GNUNET_DB_EventHeaderP *es,
136 char identifier[64])
137{
138 struct GNUNET_ShortHashCode sh;
139
140 es_to_sh (es,
141 &sh);
142 return sh_to_channel (&sh,
143 identifier);
144}
145
146
147/**
148 * Closure for #do_notify().
149 */
150struct NotifyContext
151{
152 /**
153 * Extra argument of the notification, or NULL.
154 */
155 void *extra;
156
157 /**
158 * Number of bytes in @e extra.
159 */
160 size_t extra_size;
161};
162
163
164/**
165 * Function called on every event handler that
166 * needs to be triggered.
167 *
168 * @param cls a `struct NotifyContext`
169 * @param sh channel name
170 * @param value a `struct GNUNET_DB_EventHandler`
171 * @return #GNUNET_OK continue to iterate
172 */
173static enum GNUNET_GenericReturnValue
174do_notify (void *cls,
175 const struct GNUNET_ShortHashCode *sh,
176 void *value)
177{
178 struct NotifyContext *ctx = cls;
179 struct GNUNET_DB_EventHandler *eh = value;
180
181 eh->cb (eh->cb_cls,
182 ctx->extra,
183 ctx->extra_size);
184 return GNUNET_OK;
185}
186
187
188void
189GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
190{
191 PGnotify *n;
192 unsigned int cnt = 0;
193
194 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
195 "PG poll job active\n");
196 if (1 !=
197 PQconsumeInput (db->conn))
198 {
199 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
200 "Failed to read from Postgres: %s\n",
201 PQerrorMessage (db->conn));
202 if (CONNECTION_BAD != PQstatus (db->conn))
203 return;
204 GNUNET_PQ_reconnect (db);
205 return;
206 }
207 while (NULL != (n = PQnotifies (db->conn)))
208 {
209 struct GNUNET_ShortHashCode sh;
210 struct NotifyContext ctx = {
211 .extra = NULL
212 };
213
214 cnt++;
215 if ('X' != toupper ((int) n->relname[0]))
216 {
217 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
218 "Ignoring notification for unsupported channel identifier `%s'\n",
219 n->relname);
220 PQfreemem (n);
221 continue;
222 }
223 if (GNUNET_OK !=
224 channel_to_sh (&n->relname[1],
225 &sh))
226 {
227 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
228 "Ignoring notification for unsupported channel identifier `%s'\n",
229 n->relname);
230 PQfreemem (n);
231 continue;
232 }
233 if ( (NULL != n->extra) &&
234 (GNUNET_OK !=
235 GNUNET_STRINGS_string_to_data_alloc (n->extra,
236 strlen (n->extra),
237 &ctx.extra,
238 &ctx.extra_size)))
239 {
240 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
241 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
242 n->extra,
243 n->relname);
244 PQfreemem (n);
245 continue;
246 }
247 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
248 "Received notification %s with extra data `%.*s'\n",
249 n->relname,
250 (int) ctx.extra_size,
251 (const char *) ctx.extra);
252 GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map,
253 &sh,
254 &do_notify,
255 &ctx);
256 GNUNET_free (ctx.extra);
257 PQfreemem (n);
258 }
259 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
260 "PG poll job finishes after %u events\n",
261 cnt);
262}
263
264
265/**
266 * The GNUnet scheduler notifies us that we need to
267 * trigger the DB event poller.
268 *
269 * @param cls a `struct GNUNET_PQ_Context *`
270 */
271static void
272do_scheduler_notify (void *cls)
273{
274 struct GNUNET_PQ_Context *db = cls;
275
276 db->event_task = NULL;
277 if (NULL == db->rfd)
278 GNUNET_PQ_reconnect (db);
279 GNUNET_PQ_event_do_poll (db);
280 if (NULL != db->event_task)
281 return;
282 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
283 "Resubscribing\n");
284 if (NULL == db->rfd)
285 {
286 db->resubscribe_backoff
287 = GNUNET_TIME_relative_max (db->resubscribe_backoff,
288 GNUNET_TIME_UNIT_SECONDS);
289 db->resubscribe_backoff
290 = GNUNET_TIME_STD_BACKOFF (db->resubscribe_backoff);
291 db->event_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
292 &do_scheduler_notify,
293 db);
294 return;
295 }
296 db->resubscribe_backoff = GNUNET_TIME_UNIT_SECONDS;
297 db->event_task
298 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
299 db->rfd,
300 &do_scheduler_notify,
301 db);
302}
303
304
305/**
306 * Function called when the Postgres FD changes and we need
307 * to update the scheduler event loop task.
308 *
309 * @param cls a `struct GNUNET_PQ_Context *`
310 * @param fd the file descriptor, possibly -1
311 */
312static void
313scheduler_fd_cb (void *cls,
314 int fd)
315{
316 struct GNUNET_PQ_Context *db = cls;
317
318 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
319 "New poll FD is %d\n",
320 fd);
321 if (NULL != db->event_task)
322 {
323 GNUNET_SCHEDULER_cancel (db->event_task);
324 db->event_task = NULL;
325 }
326 GNUNET_free (db->rfd);
327 if (-1 == fd)
328 return;
329 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
330 return;
331 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
332 "Activating poll job on %d\n",
333 fd);
334 db->rfd = GNUNET_NETWORK_socket_box_native (fd);
335 db->event_task
336 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO,
337 db->rfd,
338 &do_scheduler_notify,
339 db);
340}
341
342
343/**
344 * Helper function to trigger an SQL @a cmd on @a db
345 *
346 * @param db database to send command to
347 * @param cmd prefix of the command to send
348 * @param eh details about the event
349 */
350static void
351manage_subscribe (struct GNUNET_PQ_Context *db,
352 const char *cmd,
353 struct GNUNET_DB_EventHandler *eh)
354{
355 char sql[16 + 64];
356 char *end;
357 PGresult *result;
358
359 if (NULL == db->conn)
360 return;
361 end = stpcpy (sql,
362 cmd);
363 end = sh_to_channel (&eh->sh,
364 end);
365 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
366 "Executing PQ command `%s'\n",
367 sql);
368 result = PQexec (db->conn,
369 sql);
370 if (PGRES_COMMAND_OK != PQresultStatus (result))
371 {
372 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
373 "pq",
374 "Failed to execute `%s': %s/%s/%s/%s/%s",
375 sql,
376 PQresultErrorField (result,
377 PG_DIAG_MESSAGE_PRIMARY),
378 PQresultErrorField (result,
379 PG_DIAG_MESSAGE_DETAIL),
380 PQresultErrorMessage (result),
381 PQresStatus (PQresultStatus (result)),
382 PQerrorMessage (db->conn));
383 }
384 PQclear (result);
385}
386
387
388/**
389 * Re-subscribe to notifications after disconnect.
390 *
391 * @param cls the DB context
392 * @param sh the short hash of the channel
393 * @param value the event handler
394 * @return #GNUNET_OK to continue to iterate
395 */
396static enum GNUNET_GenericReturnValue
397register_notify (void *cls,
398 const struct GNUNET_ShortHashCode *sh,
399 void *value)
400{
401 struct GNUNET_PQ_Context *db = cls;
402 struct GNUNET_DB_EventHandler *eh = value;
403
404 manage_subscribe (db,
405 "LISTEN X",
406 eh);
407 return GNUNET_OK;
408}
409
410
411void
412GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
413 int fd)
414{
415 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
416 "Change in PQ event FD to %d\n",
417 fd);
418 scheduler_fd_cb (db,
419 fd);
420 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
421 &register_notify,
422 db);
423}
424
425
426/**
427 * Function run on timeout for an event. Triggers
428 * the notification, but does NOT clear the handler.
429 *
430 * @param cls a `struct GNUNET_DB_EventHandler *`
431 */
432static void
433event_timeout (void *cls)
434{
435 struct GNUNET_DB_EventHandler *eh = cls;
436
437 eh->timeout_task = NULL;
438 eh->cb (eh->cb_cls,
439 NULL,
440 0);
441}
442
443
444struct GNUNET_DB_EventHandler *
445GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
446 const struct GNUNET_DB_EventHeaderP *es,
447 struct GNUNET_TIME_Relative timeout,
448 GNUNET_DB_EventCallback cb,
449 void *cb_cls)
450{
451 struct GNUNET_DB_EventHandler *eh;
452 bool sub;
453
454 eh = GNUNET_new (struct GNUNET_DB_EventHandler);
455 eh->db = db;
456 es_to_sh (es,
457 &eh->sh);
458 eh->cb = cb;
459 eh->cb_cls = cb_cls;
460 sub = (NULL ==
461 GNUNET_CONTAINER_multishortmap_get (db->channel_map,
462 &eh->sh));
463 GNUNET_assert (GNUNET_OK ==
464 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
465 &eh->sh,
466 eh,
467 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
468 if (NULL == db->event_task)
469 {
470 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
471 "Starting event scheduler\n");
472 scheduler_fd_cb (db,
473 PQsocket (db->conn));
474 }
475 if (sub)
476 manage_subscribe (db,
477 "LISTEN X",
478 eh);
479 eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
480 &event_timeout,
481 eh);
482 return eh;
483}
484
485
486void
487GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
488{
489 struct GNUNET_PQ_Context *db = eh->db;
490
491 GNUNET_assert (GNUNET_OK ==
492 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
493 &eh->sh,
494 eh));
495 if (NULL ==
496 GNUNET_CONTAINER_multishortmap_get (db->channel_map,
497 &eh->sh))
498 manage_subscribe (db,
499 "UNLISTEN X",
500 eh);
501 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
502 {
503 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
504 "Stopping PQ event scheduler job\n");
505 GNUNET_free (db->rfd);
506 if (NULL != db->event_task)
507 {
508 GNUNET_SCHEDULER_cancel (db->event_task);
509 db->event_task = NULL;
510 }
511 }
512 if (NULL != eh->timeout_task)
513 {
514 GNUNET_SCHEDULER_cancel (eh->timeout_task);
515 eh->timeout_task = NULL;
516 }
517 GNUNET_free (eh);
518}
519
520
521char *
522GNUNET_PG_get_event_notify_channel (const struct GNUNET_DB_EventHeaderP *es)
523{
524 char sql[16 + 64 + 8];
525 char *end;
526
527 end = stpcpy (sql,
528 "X");
529 end = es_to_channel (es,
530 end);
531 GNUNET_assert (NULL != end);
532 return GNUNET_strdup (sql);
533}
534
535
536void
537GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
538 const struct GNUNET_DB_EventHeaderP *es,
539 const void *extra,
540 size_t extra_size)
541{
542 char sql[16 + 64 + extra_size * 8 / 5 + 8];
543 char *end;
544 PGresult *result;
545
546 end = stpcpy (sql,
547 "NOTIFY X");
548 end = es_to_channel (es,
549 end);
550 end = stpcpy (end,
551 ", '");
552 end = GNUNET_STRINGS_data_to_string (extra,
553 extra_size,
554 end,
555 sizeof (sql) - (end - sql) - 1);
556 GNUNET_assert (NULL != end);
557 *end = '\0';
558 end = stpcpy (end,
559 "'");
560 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
561 "Executing command `%s'\n",
562 sql);
563 result = PQexec (db->conn,
564 sql);
565 if (PGRES_COMMAND_OK != PQresultStatus (result))
566 {
567 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
568 "pq",
569 "Failed to execute `%s': %s/%s/%s/%s/%s",
570 sql,
571 PQresultErrorField (result,
572 PG_DIAG_MESSAGE_PRIMARY),
573 PQresultErrorField (result,
574 PG_DIAG_MESSAGE_DETAIL),
575 PQresultErrorMessage (result),
576 PQresStatus (PQresultStatus (result)),
577 PQerrorMessage (db->conn));
578 }
579 PQclear (result);
580 GNUNET_PQ_event_do_poll (db);
581}
582
583
584/* end of pq_event.c */