aboutsummaryrefslogtreecommitdiff
path: root/src/pq/pq_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r--src/pq/pq_event.c563
1 files changed, 0 insertions, 563 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
deleted file mode 100644
index aff52dd5c..000000000
--- a/src/pq/pq_event.c
+++ /dev/null
@@ -1,563 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2021 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file pq/pq_event.c
22 * @brief event notifications via Postgres
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "pq.h"
27#include <pthread.h>
28
29
30/**
31 * Handle for an active LISTENer to the database.
32 */
33struct GNUNET_DB_EventHandler
34{
35 /**
36 * Channel name.
37 */
38 struct GNUNET_ShortHashCode sh;
39
40 /**
41 * Function to call on events.
42 */
43 GNUNET_DB_EventCallback cb;
44
45 /**
46 * Closure for @e cb.
47 */
48 void *cb_cls;
49
50 /**
51 * Database context this event handler is with.
52 */
53 struct GNUNET_PQ_Context *db;
54
55 /**
56 * Task to run on timeout.
57 */
58 struct GNUNET_SCHEDULER_Task *timeout_task;
59};
60
61
62/**
63 * Convert @a es to a short hash.
64 *
65 * @param es spec to hash to an identifier
66 * @param[out] sh short hash to set
67 */
68static void
69es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70 struct GNUNET_ShortHashCode *sh)
71{
72 struct GNUNET_HashCode h_channel;
73
74 GNUNET_CRYPTO_hash (es,
75 ntohs (es->size),
76 &h_channel);
77 GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78 memcpy (sh,
79 &h_channel,
80 sizeof (*sh));
81}
82
83
84/**
85 * Convert @a sh to a Postgres identifier.
86 *
87 * @param sh short hash to convert to an identifier
88 * @param[out] identifier by default, Postgres supports
89 * NAMEDATALEN=64 character identifiers
90 * @return end position of the identifier
91 */
92static char *
93sh_to_channel (struct GNUNET_ShortHashCode *sh,
94 char identifier[64])
95{
96 char *end;
97
98 end = GNUNET_STRINGS_data_to_string (sh,
99 sizeof (*sh),
100 identifier,
101 63);
102 GNUNET_assert (NULL != end);
103 *end = '\0';
104 return end;
105}
106
107
108/**
109 * Convert @a sh to a Postgres identifier.
110 *
111 * @param identifier to convert
112 * @param[out] sh set to short hash
113 * @return #GNUNET_OK on success
114 */
115static enum GNUNET_GenericReturnValue
116channel_to_sh (const char *identifier,
117 struct GNUNET_ShortHashCode *sh)
118{
119 return GNUNET_STRINGS_string_to_data (identifier,
120 strlen (identifier),
121 sh,
122 sizeof (*sh));
123}
124
125
126/**
127 * Convert @a es to a Postgres identifier.
128 *
129 * @param es spec to hash to an identifier
130 * @param[out] identifier by default, Postgres supports
131 * NAMEDATALEN=64 character identifiers
132 * @return end position of the identifier
133 */
134static char *
135es_to_channel (const struct GNUNET_DB_EventHeaderP *es,
136 char identifier[64])
137{
138 struct GNUNET_ShortHashCode sh;
139
140 es_to_sh (es,
141 &sh);
142 return sh_to_channel (&sh,
143 identifier);
144}
145
146
147/**
148 * Closure for #do_notify().
149 */
150struct NotifyContext
151{
152 /**
153 * Extra argument of the notification, or NULL.
154 */
155 void *extra;
156
157 /**
158 * Number of bytes in @e extra.
159 */
160 size_t extra_size;
161};
162
163
164/**
165 * Function called on every event handler that
166 * needs to be triggered.
167 *
168 * @param cls a `struct NotifyContext`
169 * @param sh channel name
170 * @param value a `struct GNUNET_DB_EventHandler`
171 * @return #GNUNET_OK continue to iterate
172 */
173static int
174do_notify (void *cls,
175 const struct GNUNET_ShortHashCode *sh,
176 void *value)
177{
178 struct NotifyContext *ctx = cls;
179 struct GNUNET_DB_EventHandler *eh = value;
180
181 eh->cb (eh->cb_cls,
182 ctx->extra,
183 ctx->extra_size);
184 return GNUNET_OK;
185}
186
187
188static void
189event_do_poll (struct GNUNET_PQ_Context *db)
190{
191 PGnotify *n;
192 unsigned int cnt = 0;
193
194 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
195 "PG poll job active\n");
196 if (1 !=
197 PQconsumeInput (db->conn))
198 {
199 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
200 "Failed to read from Postgres: %s\n",
201 PQerrorMessage (db->conn));
202 if (CONNECTION_BAD != PQstatus (db->conn))
203 return;
204 GNUNET_PQ_reconnect (db);
205 return;
206 }
207 while (NULL != (n = PQnotifies (db->conn)))
208 {
209 struct GNUNET_ShortHashCode sh;
210 struct NotifyContext ctx = {
211 .extra = NULL
212 };
213
214 cnt++;
215 if ('X' != toupper ((int) n->relname[0]))
216 {
217 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
218 "Ignoring notification for unsupported channel identifier `%s'\n",
219 n->relname);
220 PQfreemem (n);
221 continue;
222 }
223 if (GNUNET_OK !=
224 channel_to_sh (&n->relname[1],
225 &sh))
226 {
227 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
228 "Ignoring notification for unsupported channel identifier `%s'\n",
229 n->relname);
230 PQfreemem (n);
231 continue;
232 }
233 if ( (NULL != n->extra) &&
234 (GNUNET_OK !=
235 GNUNET_STRINGS_string_to_data_alloc (n->extra,
236 strlen (n->extra),
237 &ctx.extra,
238 &ctx.extra_size)))
239 {
240 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
241 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
242 n->extra,
243 n->relname);
244 PQfreemem (n);
245 continue;
246 }
247 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
248 "Received notification %s with extra data `%.*s'\n",
249 n->relname,
250 (int) ctx.extra_size,
251 (const char *) ctx.extra);
252 GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map,
253 &sh,
254 &do_notify,
255 &ctx);
256 GNUNET_free (ctx.extra);
257 PQfreemem (n);
258 }
259 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
260 "PG poll job finishes after %u events\n",
261 cnt);
262}
263
264
265/**
266 * The GNUnet scheduler notifies us that we need to
267 * trigger the DB event poller.
268 *
269 * @param cls a `struct GNUNET_PQ_Context *`
270 */
271static void
272do_scheduler_notify (void *cls)
273{
274 struct GNUNET_PQ_Context *db = cls;
275
276 db->event_task = NULL;
277 if (NULL == db->rfd)
278 GNUNET_PQ_reconnect (db);
279 event_do_poll (db);
280 if (NULL != db->event_task)
281 return;
282 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
283 "Resubscribing\n");
284 if (NULL == db->rfd)
285 {
286 db->event_task = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS,
287 &do_scheduler_notify,
288 db);
289 return;
290 }
291 db->event_task
292 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
293 db->rfd,
294 &do_scheduler_notify,
295 db);
296}
297
298
299/**
300 * Function called when the Postgres FD changes and we need
301 * to update the scheduler event loop task.
302 *
303 * @param cls a `struct GNUNET_PQ_Context *`
304 * @param fd the file descriptor, possibly -1
305 */
306static void
307scheduler_fd_cb (void *cls,
308 int fd)
309{
310 struct GNUNET_PQ_Context *db = cls;
311
312 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
313 "New poll FD is %d\n",
314 fd);
315 if (NULL != db->event_task)
316 {
317 GNUNET_SCHEDULER_cancel (db->event_task);
318 db->event_task = NULL;
319 }
320 GNUNET_free (db->rfd);
321 if (-1 == fd)
322 return;
323 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
324 return;
325 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
326 "Activating poll job on %d\n",
327 fd);
328 db->rfd = GNUNET_NETWORK_socket_box_native (fd);
329 db->event_task
330 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO,
331 db->rfd,
332 &do_scheduler_notify,
333 db);
334}
335
336
337/**
338 * Helper function to trigger an SQL @a cmd on @a db
339 *
340 * @param db database to send command to
341 * @param cmd prefix of the command to send
342 * @param eh details about the event
343 */
344static void
345manage_subscribe (struct GNUNET_PQ_Context *db,
346 const char *cmd,
347 struct GNUNET_DB_EventHandler *eh)
348{
349 char sql[16 + 64];
350 char *end;
351 PGresult *result;
352
353 if (NULL == db->conn)
354 return;
355 end = stpcpy (sql,
356 cmd);
357 end = sh_to_channel (&eh->sh,
358 end);
359 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
360 "Executing PQ command `%s'\n",
361 sql);
362 result = PQexec (db->conn,
363 sql);
364 if (PGRES_COMMAND_OK != PQresultStatus (result))
365 {
366 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
367 "pq",
368 "Failed to execute `%s': %s/%s/%s/%s/%s",
369 sql,
370 PQresultErrorField (result,
371 PG_DIAG_MESSAGE_PRIMARY),
372 PQresultErrorField (result,
373 PG_DIAG_MESSAGE_DETAIL),
374 PQresultErrorMessage (result),
375 PQresStatus (PQresultStatus (result)),
376 PQerrorMessage (db->conn));
377 }
378 PQclear (result);
379}
380
381
382/**
383 * Re-subscribe to notifications after disconnect.
384 *
385 * @param cls the DB context
386 * @param sh the short hash of the channel
387 * @param eh the event handler
388 * @return #GNUNET_OK to continue to iterate
389 */
390static int
391register_notify (void *cls,
392 const struct GNUNET_ShortHashCode *sh,
393 void *value)
394{
395 struct GNUNET_PQ_Context *db = cls;
396 struct GNUNET_DB_EventHandler *eh = value;
397
398 manage_subscribe (db,
399 "LISTEN X",
400 eh);
401 return GNUNET_OK;
402}
403
404
405void
406GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
407 int fd)
408{
409 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
410 "Change in PQ event FD to %d\n",
411 fd);
412 scheduler_fd_cb (db,
413 fd);
414 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
415 &register_notify,
416 db);
417}
418
419
420/**
421 * Function run on timeout for an event. Triggers
422 * the notification, but does NOT clear the handler.
423 *
424 * @param cls a `struct GNUNET_DB_EventHandler *`
425 */
426static void
427event_timeout (void *cls)
428{
429 struct GNUNET_DB_EventHandler *eh = cls;
430
431 eh->timeout_task = NULL;
432 eh->cb (eh->cb_cls,
433 NULL,
434 0);
435}
436
437
438struct GNUNET_DB_EventHandler *
439GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
440 const struct GNUNET_DB_EventHeaderP *es,
441 struct GNUNET_TIME_Relative timeout,
442 GNUNET_DB_EventCallback cb,
443 void *cb_cls)
444{
445 struct GNUNET_DB_EventHandler *eh;
446 bool sub;
447
448 eh = GNUNET_new (struct GNUNET_DB_EventHandler);
449 eh->db = db;
450 es_to_sh (es,
451 &eh->sh);
452 eh->cb = cb;
453 eh->cb_cls = cb_cls;
454 sub = (NULL ==
455 GNUNET_CONTAINER_multishortmap_get (db->channel_map,
456 &eh->sh));
457 GNUNET_assert (GNUNET_OK ==
458 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
459 &eh->sh,
460 eh,
461 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
462 if (NULL == db->event_task)
463 {
464 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
465 "Starting event scheduler\n");
466 scheduler_fd_cb (db,
467 PQsocket (db->conn));
468 }
469 if (sub)
470 manage_subscribe (db,
471 "LISTEN X",
472 eh);
473 eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
474 &event_timeout,
475 eh);
476 return eh;
477}
478
479
480void
481GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
482{
483 struct GNUNET_PQ_Context *db = eh->db;
484
485 GNUNET_assert (GNUNET_OK ==
486 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
487 &eh->sh,
488 eh));
489 if (NULL ==
490 GNUNET_CONTAINER_multishortmap_get (db->channel_map,
491 &eh->sh))
492 manage_subscribe (db,
493 "UNLISTEN X",
494 eh);
495 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
496 {
497 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
498 "Stopping PQ event scheduler job\n");
499 GNUNET_free (db->rfd);
500 if (NULL != db->event_task)
501 {
502 GNUNET_SCHEDULER_cancel (db->event_task);
503 db->event_task = NULL;
504 }
505 }
506 if (NULL != eh->timeout_task)
507 {
508 GNUNET_SCHEDULER_cancel (eh->timeout_task);
509 eh->timeout_task = NULL;
510 }
511 GNUNET_free (eh);
512}
513
514
515void
516GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
517 const struct GNUNET_DB_EventHeaderP *es,
518 const void *extra,
519 size_t extra_size)
520{
521 char sql[16 + 64 + extra_size * 8 / 5 + 8];
522 char *end;
523 PGresult *result;
524
525 end = stpcpy (sql,
526 "NOTIFY X");
527 end = es_to_channel (es,
528 end);
529 end = stpcpy (end,
530 ", '");
531 end = GNUNET_STRINGS_data_to_string (extra,
532 extra_size,
533 end,
534 sizeof (sql) - (end - sql) - 1);
535 GNUNET_assert (NULL != end);
536 *end = '\0';
537 end = stpcpy (end,
538 "'");
539 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
540 "Executing command `%s'\n",
541 sql);
542 result = PQexec (db->conn,
543 sql);
544 if (PGRES_COMMAND_OK != PQresultStatus (result))
545 {
546 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
547 "pq",
548 "Failed to execute `%s': %s/%s/%s/%s/%s",
549 sql,
550 PQresultErrorField (result,
551 PG_DIAG_MESSAGE_PRIMARY),
552 PQresultErrorField (result,
553 PG_DIAG_MESSAGE_DETAIL),
554 PQresultErrorMessage (result),
555 PQresStatus (PQresultStatus (result)),
556 PQerrorMessage (db->conn));
557 }
558 PQclear (result);
559 event_do_poll (db);
560}
561
562
563/* end of pq_event.c */