aboutsummaryrefslogtreecommitdiff
path: root/src/pq/pq_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r--src/pq/pq_event.c558
1 files changed, 0 insertions, 558 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
deleted file mode 100644
index 98a28c317..000000000
--- a/src/pq/pq_event.c
+++ /dev/null
@@ -1,558 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2021 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file pq/pq_event.c
22 * @brief event notifications via Postgres
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "pq.h"
27#include <pthread.h>
28
29
30/**
31 * Handle for an active LISTENer to the database.
32 */
33struct GNUNET_DB_EventHandler
34{
35 /**
36 * Channel name.
37 */
38 struct GNUNET_ShortHashCode sh;
39
40 /**
41 * Function to call on events.
42 */
43 GNUNET_DB_EventCallback cb;
44
45 /**
46 * Closure for @e cb.
47 */
48 void *cb_cls;
49
50 /**
51 * Database context this event handler is with.
52 */
53 struct GNUNET_PQ_Context *db;
54
55 /**
56 * Task to run on timeout.
57 */
58 struct GNUNET_SCHEDULER_Task *timeout_task;
59};
60
61
62/**
63 * Convert @a es to a short hash.
64 *
65 * @param es spec to hash to an identifier
66 * @param[out] sh short hash to set
67 */
68static void
69es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70 struct GNUNET_ShortHashCode *sh)
71{
72 struct GNUNET_HashCode h_channel;
73
74 GNUNET_CRYPTO_hash (es,
75 ntohs (es->size),
76 &h_channel);
77 GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78 memcpy (sh,
79 &h_channel,
80 sizeof (*sh));
81}
82
83
84/**
85 * Convert @a sh to a Postgres identifier.
86 *
87 * @param sh short hash to convert to an identifier
88 * @param[out] identifier by default, Postgres supports
89 * NAMEDATALEN=64 character identifiers
90 * @return end position of the identifier
91 */
92static char *
93sh_to_channel (struct GNUNET_ShortHashCode *sh,
94 char identifier[64])
95{
96 char *end;
97
98 end = GNUNET_STRINGS_data_to_string (sh,
99 sizeof (*sh),
100 identifier,
101 63);
102 GNUNET_assert (NULL != end);
103 *end = '\0';
104 return end;
105}
106
107
108/**
109 * Convert @a sh to a Postgres identifier.
110 *
111 * @param identifier to convert
112 * @param[out] sh set to short hash
113 * @return #GNUNET_OK on success
114 */
115static enum GNUNET_GenericReturnValue
116channel_to_sh (const char *identifier,
117 struct GNUNET_ShortHashCode *sh)
118{
119 return GNUNET_STRINGS_string_to_data (identifier,
120 strlen (identifier),
121 sh,
122 sizeof (*sh));
123}
124
125
126/**
127 * Convert @a es to a Postgres identifier.
128 *
129 * @param es spec to hash to an identifier
130 * @param[out] identifier by default, Postgres supports
131 * NAMEDATALEN=64 character identifiers
132 * @return end position of the identifier
133 */
134static char *
135es_to_channel (const struct GNUNET_DB_EventHeaderP *es,
136 char identifier[64])
137{
138 struct GNUNET_ShortHashCode sh;
139
140 es_to_sh (es,
141 &sh);
142 return sh_to_channel (&sh,
143 identifier);
144}
145
146
147/**
148 * Closure for #do_notify().
149 */
150struct NotifyContext
151{
152 /**
153 * Extra argument of the notification, or NULL.
154 */
155 void *extra;
156
157 /**
158 * Number of bytes in @e extra.
159 */
160 size_t extra_size;
161};
162
163
164/**
165 * Function called on every event handler that
166 * needs to be triggered.
167 *
168 * @param cls a `struct NotifyContext`
169 * @param sh channel name
170 * @param value a `struct GNUNET_DB_EventHandler`
171 * @return #GNUNET_OK continue to iterate
172 */
173static int
174do_notify (void *cls,
175 const struct GNUNET_ShortHashCode *sh,
176 void *value)
177{
178 struct NotifyContext *ctx = cls;
179 struct GNUNET_DB_EventHandler *eh = value;
180
181 eh->cb (eh->cb_cls,
182 ctx->extra,
183 ctx->extra_size);
184 return GNUNET_OK;
185}
186
187
188static void
189event_do_poll (struct GNUNET_PQ_Context *db)
190{
191 PGnotify *n;
192
193 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
194 "PG poll job active\n");
195 if (1 !=
196 PQconsumeInput (db->conn))
197 {
198 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
199 "Failed to read from Postgres: %s\n",
200 PQerrorMessage (db->conn));
201 if (CONNECTION_BAD != PQstatus (db->conn))
202 return;
203 GNUNET_PQ_reconnect (db);
204 return;
205 }
206 while (NULL != (n = PQnotifies (db->conn)))
207 {
208 struct GNUNET_ShortHashCode sh;
209 struct NotifyContext ctx = {
210 .extra = NULL
211 };
212
213 if ('X' != toupper ((int) n->relname[0]))
214 {
215 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
216 "Ignoring notification for unsupported channel identifier `%s'\n",
217 n->relname);
218 PQfreemem (n);
219 continue;
220 }
221 if (GNUNET_OK !=
222 channel_to_sh (&n->relname[1],
223 &sh))
224 {
225 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
226 "Ignoring notification for unsupported channel identifier `%s'\n",
227 n->relname);
228 PQfreemem (n);
229 continue;
230 }
231 if ( (NULL != n->extra) &&
232 (GNUNET_OK !=
233 GNUNET_STRINGS_string_to_data_alloc (n->extra,
234 strlen (n->extra),
235 &ctx.extra,
236 &ctx.extra_size)))
237 {
238 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
239 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
240 n->extra,
241 n->relname);
242 PQfreemem (n);
243 continue;
244 }
245 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
246 "Received notification %s with extra data `%.*s'\n",
247 n->relname,
248 (int) ctx.extra_size,
249 (const char *) ctx.extra);
250 GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map,
251 &sh,
252 &do_notify,
253 &ctx);
254 GNUNET_free (ctx.extra);
255 PQfreemem (n);
256 }
257}
258
259
260/**
261 * The GNUnet scheduler notifies us that we need to
262 * trigger the DB event poller.
263 *
264 * @param cls a `struct GNUNET_PQ_Context *`
265 */
266static void
267do_scheduler_notify (void *cls)
268{
269 struct GNUNET_PQ_Context *db = cls;
270
271 db->event_task = NULL;
272 if (NULL == db->rfd)
273 GNUNET_PQ_reconnect (db);
274 event_do_poll (db);
275 if (NULL != db->event_task)
276 return;
277 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
278 "Resubscribing\n");
279 if (NULL == db->rfd)
280 {
281 db->event_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
282 &do_scheduler_notify,
283 db);
284 return;
285 }
286 db->event_task
287 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
288 db->rfd,
289 &do_scheduler_notify,
290 db);
291}
292
293
294/**
295 * Function called when the Postgres FD changes and we need
296 * to update the scheduler event loop task.
297 *
298 * @param cls a `struct GNUNET_PQ_Context *`
299 * @param fd the file descriptor, possibly -1
300 */
301static void
302scheduler_fd_cb (void *cls,
303 int fd)
304{
305 struct GNUNET_PQ_Context *db = cls;
306
307 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
308 "New poll FD is %d\n",
309 fd);
310 if (NULL != db->event_task)
311 {
312 GNUNET_SCHEDULER_cancel (db->event_task);
313 db->event_task = NULL;
314 }
315 GNUNET_free (db->rfd);
316 if (-1 == fd)
317 return;
318 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
319 return;
320 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
321 "Activating poll job on %d\n",
322 fd);
323 db->rfd = GNUNET_NETWORK_socket_box_native (fd);
324 db->event_task
325 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO,
326 db->rfd,
327 &do_scheduler_notify,
328 db);
329}
330
331
332/**
333 * Helper function to trigger an SQL @a cmd on @a db
334 *
335 * @param db database to send command to
336 * @param cmd prefix of the command to send
337 * @param eh details about the event
338 */
339static void
340manage_subscribe (struct GNUNET_PQ_Context *db,
341 const char *cmd,
342 struct GNUNET_DB_EventHandler *eh)
343{
344 char sql[16 + 64];
345 char *end;
346 PGresult *result;
347
348 if (NULL == db->conn)
349 return;
350 end = stpcpy (sql,
351 cmd);
352 end = sh_to_channel (&eh->sh,
353 end);
354 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
355 "Executing PQ command `%s'\n",
356 sql);
357 result = PQexec (db->conn,
358 sql);
359 if (PGRES_COMMAND_OK != PQresultStatus (result))
360 {
361 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
362 "pq",
363 "Failed to execute `%s': %s/%s/%s/%s/%s",
364 sql,
365 PQresultErrorField (result,
366 PG_DIAG_MESSAGE_PRIMARY),
367 PQresultErrorField (result,
368 PG_DIAG_MESSAGE_DETAIL),
369 PQresultErrorMessage (result),
370 PQresStatus (PQresultStatus (result)),
371 PQerrorMessage (db->conn));
372 }
373 PQclear (result);
374}
375
376
377/**
378 * Re-subscribe to notifications after disconnect.
379 *
380 * @param cls the DB context
381 * @param sh the short hash of the channel
382 * @param eh the event handler
383 * @return #GNUNET_OK to continue to iterate
384 */
385static int
386register_notify (void *cls,
387 const struct GNUNET_ShortHashCode *sh,
388 void *value)
389{
390 struct GNUNET_PQ_Context *db = cls;
391 struct GNUNET_DB_EventHandler *eh = value;
392
393 manage_subscribe (db,
394 "LISTEN X",
395 eh);
396 return GNUNET_OK;
397}
398
399
400void
401GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
402 int fd)
403{
404 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
405 "Change in PQ event FD to %d\n",
406 fd);
407 scheduler_fd_cb (db,
408 fd);
409 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
410 &register_notify,
411 db);
412}
413
414
415/**
416 * Function run on timeout for an event. Triggers
417 * the notification, but does NOT clear the handler.
418 *
419 * @param cls a `struct GNUNET_DB_EventHandler *`
420 */
421static void
422event_timeout (void *cls)
423{
424 struct GNUNET_DB_EventHandler *eh = cls;
425
426 eh->timeout_task = NULL;
427 eh->cb (eh->cb_cls,
428 NULL,
429 0);
430}
431
432
433struct GNUNET_DB_EventHandler *
434GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
435 const struct GNUNET_DB_EventHeaderP *es,
436 struct GNUNET_TIME_Relative timeout,
437 GNUNET_DB_EventCallback cb,
438 void *cb_cls)
439{
440 struct GNUNET_DB_EventHandler *eh;
441 bool sub;
442
443 eh = GNUNET_new (struct GNUNET_DB_EventHandler);
444 eh->db = db;
445 es_to_sh (es,
446 &eh->sh);
447 eh->cb = cb;
448 eh->cb_cls = cb_cls;
449 sub = (NULL ==
450 GNUNET_CONTAINER_multishortmap_get (db->channel_map,
451 &eh->sh));
452 GNUNET_assert (GNUNET_OK ==
453 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
454 &eh->sh,
455 eh,
456 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
457 if (NULL == db->event_task)
458 {
459 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
460 "Starting event scheduler\n");
461 scheduler_fd_cb (db,
462 PQsocket (db->conn));
463 }
464 if (sub)
465 manage_subscribe (db,
466 "LISTEN X",
467 eh);
468 eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
469 &event_timeout,
470 eh);
471 return eh;
472}
473
474
475void
476GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
477{
478 struct GNUNET_PQ_Context *db = eh->db;
479
480 GNUNET_assert (GNUNET_OK ==
481 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
482 &eh->sh,
483 eh));
484 if (NULL ==
485 GNUNET_CONTAINER_multishortmap_get (db->channel_map,
486 &eh->sh))
487 manage_subscribe (db,
488 "UNLISTEN X",
489 eh);
490 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
491 {
492 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
493 "Stopping PQ event scheduler job\n");
494 GNUNET_free (db->rfd);
495 if (NULL != db->event_task)
496 {
497 GNUNET_SCHEDULER_cancel (db->event_task);
498 db->event_task = NULL;
499 }
500 }
501 if (NULL != eh->timeout_task)
502 {
503 GNUNET_SCHEDULER_cancel (eh->timeout_task);
504 eh->timeout_task = NULL;
505 }
506 GNUNET_free (eh);
507}
508
509
510void
511GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
512 const struct GNUNET_DB_EventHeaderP *es,
513 const void *extra,
514 size_t extra_size)
515{
516 char sql[16 + 64 + extra_size * 8 / 5 + 8];
517 char *end;
518 PGresult *result;
519
520 end = stpcpy (sql,
521 "NOTIFY X");
522 end = es_to_channel (es,
523 end);
524 end = stpcpy (end,
525 ", '");
526 end = GNUNET_STRINGS_data_to_string (extra,
527 extra_size,
528 end,
529 sizeof (sql) - (end - sql) - 1);
530 GNUNET_assert (NULL != end);
531 *end = '\0';
532 end = stpcpy (end,
533 "'");
534 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
535 "Executing command `%s'\n",
536 sql);
537 result = PQexec (db->conn,
538 sql);
539 if (PGRES_COMMAND_OK != PQresultStatus (result))
540 {
541 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
542 "pq",
543 "Failed to execute `%s': %s/%s/%s/%s/%s",
544 sql,
545 PQresultErrorField (result,
546 PG_DIAG_MESSAGE_PRIMARY),
547 PQresultErrorField (result,
548 PG_DIAG_MESSAGE_DETAIL),
549 PQresultErrorMessage (result),
550 PQresStatus (PQresultStatus (result)),
551 PQerrorMessage (db->conn));
552 }
553 PQclear (result);
554 event_do_poll (db);
555}
556
557
558/* end of pq_event.c */