aboutsummaryrefslogtreecommitdiff
path: root/src/pq/pq_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r--src/pq/pq_event.c532
1 files changed, 0 insertions, 532 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
deleted file mode 100644
index 79a2e80c6..000000000
--- a/src/pq/pq_event.c
+++ /dev/null
@@ -1,532 +0,0 @@
1/*
2 This file is part of GNUnet
3 Copyright (C) 2021 GNUnet e.V.
4
5 GNUnet is free software: you can redistribute it and/or modify it
6 under the terms of the GNU Affero General Public License as published
7 by the Free Software Foundation, either version 3 of the License,
8 or (at your option) any later version.
9
10 GNUnet is distributed in the hope that it will be useful, but
11 WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 Affero General Public License for more details.
14
15 You should have received a copy of the GNU Affero General Public License
16 along with this program. If not, see <http://www.gnu.org/licenses/>.
17
18 SPDX-License-Identifier: AGPL3.0-or-later
19 */
20/**
21 * @file pq/pq_event.c
22 * @brief event notifications via Postgres
23 * @author Christian Grothoff
24 */
25#include "platform.h"
26#include "pq.h"
27#include <pthread.h>
28
29
30/**
31 * Handle for an active LISTENer to the database.
32 */
33struct GNUNET_DB_EventHandler
34{
35 /**
36 * Channel name.
37 */
38 struct GNUNET_ShortHashCode sh;
39
40 /**
41 * Function to call on events.
42 */
43 GNUNET_DB_EventCallback cb;
44
45 /**
46 * Closure for @e cb.
47 */
48 void *cb_cls;
49
50 /**
51 * Database context this event handler is with.
52 */
53 struct GNUNET_PQ_Context *db;
54
55 /**
56 * Task to run on timeout.
57 */
58 struct GNUNET_SCHEDULER_Task *timeout_task;
59};
60
61
62/**
63 * Convert @a es to a short hash.
64 *
65 * @param es spec to hash to an identifier
66 * @param[out] sh short hash to set
67 */
68static void
69es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
70 struct GNUNET_ShortHashCode *sh)
71{
72 struct GNUNET_HashCode h_channel;
73
74 GNUNET_CRYPTO_hash (es,
75 ntohs (es->size),
76 &h_channel);
77 GNUNET_static_assert (sizeof (*sh) <= sizeof (h_channel));
78 memcpy (sh,
79 &h_channel,
80 sizeof (*sh));
81}
82
83
84/**
85 * Convert @a sh to a Postgres identifier.
86 *
87 * @param sh short hash to convert to an identifier
88 * @param[out] identifier by default, Postgres supports
89 * NAMEDATALEN=64 character identifiers
90 * @return end position of the identifier
91 */
92static char *
93sh_to_channel (struct GNUNET_ShortHashCode *sh,
94 char identifier[64])
95{
96 char *end;
97
98 end = GNUNET_STRINGS_data_to_string (sh,
99 sizeof (*sh),
100 identifier,
101 63);
102 GNUNET_assert (NULL != end);
103 *end = '\0';
104 return end;
105}
106
107
108/**
109 * Convert @a sh to a Postgres identifier.
110 *
111 * @param identifier to convert
112 * @param[out] sh set to short hash
113 * @return #GNUNET_OK on success
114 */
115static enum GNUNET_GenericReturnValue
116channel_to_sh (const char *identifier,
117 struct GNUNET_ShortHashCode *sh)
118{
119 return GNUNET_STRINGS_string_to_data (identifier,
120 strlen (identifier),
121 sh,
122 sizeof (*sh));
123}
124
125
126/**
127 * Convert @a es to a Postgres identifier.
128 *
129 * @param es spec to hash to an identifier
130 * @param[out] identifier by default, Postgres supports
131 * NAMEDATALEN=64 character identifiers
132 * @return end position of the identifier
133 */
134static char *
135es_to_channel (const struct GNUNET_DB_EventHeaderP *es,
136 char identifier[64])
137{
138 struct GNUNET_ShortHashCode sh;
139
140 es_to_sh (es,
141 &sh);
142 return sh_to_channel (&sh,
143 identifier);
144}
145
146
147/**
148 * Closure for #do_notify().
149 */
150struct NotifyContext
151{
152 /**
153 * Extra argument of the notification, or NULL.
154 */
155 void *extra;
156
157 /**
158 * Number of bytes in @e extra.
159 */
160 size_t extra_size;
161};
162
163
164/**
165 * Function called on every event handler that
166 * needs to be triggered.
167 *
168 * @param cls a `struct NotifyContext`
169 * @param sh channel name
170 * @param value a `struct GNUNET_DB_EventHandler`
171 * @return #GNUNET_OK continue to iterate
172 */
173static int
174do_notify (void *cls,
175 const struct GNUNET_ShortHashCode *sh,
176 void *value)
177{
178 struct NotifyContext *ctx = cls;
179 struct GNUNET_DB_EventHandler *eh = value;
180
181 eh->cb (eh->cb_cls,
182 ctx->extra,
183 ctx->extra_size);
184 return GNUNET_OK;
185}
186
187
188static void
189event_do_poll (struct GNUNET_PQ_Context *db)
190{
191 PGnotify *n;
192
193 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
194 "PG poll job active\n");
195 if (1 !=
196 PQconsumeInput (db->conn))
197 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
198 "Failed to read from Postgres: %s\n",
199 PQerrorMessage (db->conn));
200 while (NULL != (n = PQnotifies (db->conn)))
201 {
202 struct GNUNET_ShortHashCode sh;
203 struct NotifyContext ctx = {
204 .extra = NULL
205 };
206
207 if ('X' != toupper ((int) n->relname[0]))
208 {
209 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
210 "Ignoring notification for unsupported channel identifier `%s'\n",
211 n->relname);
212 PQfreemem (n);
213 continue;
214 }
215 if (GNUNET_OK !=
216 channel_to_sh (&n->relname[1],
217 &sh))
218 {
219 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
220 "Ignoring notification for unsupported channel identifier `%s'\n",
221 n->relname);
222 PQfreemem (n);
223 continue;
224 }
225 if ( (NULL != n->extra) &&
226 (GNUNET_OK !=
227 GNUNET_STRINGS_string_to_data_alloc (n->extra,
228 strlen (n->extra),
229 &ctx.extra,
230 &ctx.extra_size)))
231 {
232 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
233 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
234 n->extra,
235 n->relname);
236 PQfreemem (n);
237 continue;
238 }
239 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
240 "Received notification %s with extra data `%.*s'\n",
241 n->relname,
242 (int) ctx.extra_size,
243 (const char *) ctx.extra);
244 GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map,
245 &sh,
246 &do_notify,
247 &ctx);
248 GNUNET_free (ctx.extra);
249 PQfreemem (n);
250 }
251}
252
253
254/**
255 * The GNUnet scheduler notifies us that we need to
256 * trigger the DB event poller.
257 *
258 * @param cls a `struct GNUNET_PQ_Context *`
259 */
260static void
261do_scheduler_notify (void *cls)
262{
263 struct GNUNET_PQ_Context *db = cls;
264
265 db->event_task = NULL;
266 GNUNET_assert (NULL != db->rfd);
267 event_do_poll (db);
268 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
269 "Resubscribing\n");
270 db->event_task
271 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
272 db->rfd,
273 &do_scheduler_notify,
274 db);
275}
276
277
278/**
279 * Function called when the Postgres FD changes and we need
280 * to update the scheduler event loop task.
281 *
282 * @param cls a `struct GNUNET_PQ_Context *`
283 * @param fd the file descriptor, possibly -1
284 */
285static void
286scheduler_fd_cb (void *cls,
287 int fd)
288{
289 struct GNUNET_PQ_Context *db = cls;
290
291 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
292 "New poll FD is %d\n",
293 fd);
294 if (NULL != db->event_task)
295 {
296 GNUNET_SCHEDULER_cancel (db->event_task);
297 db->event_task = NULL;
298 }
299 GNUNET_free (db->rfd);
300 if (-1 == fd)
301 return;
302 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
303 return;
304 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
305 "Activating poll job on %d\n",
306 fd);
307 db->rfd = GNUNET_NETWORK_socket_box_native (fd);
308 db->event_task
309 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO,
310 db->rfd,
311 &do_scheduler_notify,
312 db);
313}
314
315
316/**
317 * Helper function to trigger an SQL @a cmd on @a db
318 *
319 * @param db database to send command to
320 * @param cmd prefix of the command to send
321 * @param eh details about the event
322 */
323static void
324manage_subscribe (struct GNUNET_PQ_Context *db,
325 const char *cmd,
326 struct GNUNET_DB_EventHandler *eh)
327{
328 char sql[16 + 64];
329 char *end;
330 PGresult *result;
331
332 end = stpcpy (sql,
333 cmd);
334 end = sh_to_channel (&eh->sh,
335 end);
336 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
337 "Executing PQ command `%s'\n",
338 sql);
339 result = PQexec (db->conn,
340 sql);
341 if (PGRES_COMMAND_OK != PQresultStatus (result))
342 {
343 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
344 "pq",
345 "Failed to execute `%s': %s/%s/%s/%s/%s",
346 sql,
347 PQresultErrorField (result,
348 PG_DIAG_MESSAGE_PRIMARY),
349 PQresultErrorField (result,
350 PG_DIAG_MESSAGE_DETAIL),
351 PQresultErrorMessage (result),
352 PQresStatus (PQresultStatus (result)),
353 PQerrorMessage (db->conn));
354 }
355 PQclear (result);
356}
357
358
359/**
360 * Re-subscribe to notifications after disconnect.
361 *
362 * @param cls the DB context
363 * @param sh the short hash of the channel
364 * @param eh the event handler
365 * @return #GNUNET_OK to continue to iterate
366 */
367static int
368register_notify (void *cls,
369 const struct GNUNET_ShortHashCode *sh,
370 void *value)
371{
372 struct GNUNET_PQ_Context *db = cls;
373 struct GNUNET_DB_EventHandler *eh = value;
374
375 manage_subscribe (db,
376 "LISTEN X",
377 eh);
378 return GNUNET_OK;
379}
380
381
382void
383GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
384 int fd)
385{
386 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
387 "Change in PQ event FD to %d\n",
388 fd);
389 scheduler_fd_cb (db,
390 fd);
391 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
392 &register_notify,
393 db);
394}
395
396
397/**
398 * Function run on timeout for an event. Triggers
399 * the notification, but does NOT clear the handler.
400 *
401 * @param cls a `struct GNUNET_DB_EventHandler *`
402 */
403static void
404event_timeout (void *cls)
405{
406 struct GNUNET_DB_EventHandler *eh = cls;
407
408 eh->timeout_task = NULL;
409 eh->cb (eh->cb_cls,
410 NULL,
411 0);
412}
413
414
415struct GNUNET_DB_EventHandler *
416GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
417 const struct GNUNET_DB_EventHeaderP *es,
418 struct GNUNET_TIME_Relative timeout,
419 GNUNET_DB_EventCallback cb,
420 void *cb_cls)
421{
422 struct GNUNET_DB_EventHandler *eh;
423
424 eh = GNUNET_new (struct GNUNET_DB_EventHandler);
425 eh->db = db;
426 es_to_sh (es,
427 &eh->sh);
428 eh->cb = cb;
429 eh->cb_cls = cb_cls;
430 GNUNET_assert (GNUNET_OK ==
431 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
432 &eh->sh,
433 eh,
434 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
435 if (NULL == db->event_task)
436 {
437 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
438 "Starting event scheduler\n");
439 scheduler_fd_cb (db,
440 PQsocket (db->conn));
441 }
442 manage_subscribe (db,
443 "LISTEN X",
444 eh);
445 eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
446 &event_timeout,
447 eh);
448 return eh;
449}
450
451
452void
453GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
454{
455 struct GNUNET_PQ_Context *db = eh->db;
456
457 GNUNET_assert (GNUNET_OK ==
458 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
459 &eh->sh,
460 eh));
461 manage_subscribe (db,
462 "UNLISTEN X",
463 eh);
464 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
465 {
466 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
467 "Stopping PQ event scheduler job\n");
468 GNUNET_free (db->rfd);
469 if (NULL != db->event_task)
470 {
471 GNUNET_SCHEDULER_cancel (db->event_task);
472 db->event_task = NULL;
473 }
474 }
475 if (NULL != eh->timeout_task)
476 {
477 GNUNET_SCHEDULER_cancel (eh->timeout_task);
478 eh->timeout_task = NULL;
479 }
480 GNUNET_free (eh);
481}
482
483
484void
485GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
486 const struct GNUNET_DB_EventHeaderP *es,
487 const void *extra,
488 size_t extra_size)
489{
490 char sql[16 + 64 + extra_size * 8 / 5 + 8];
491 char *end;
492 PGresult *result;
493
494 end = stpcpy (sql,
495 "NOTIFY X");
496 end = es_to_channel (es,
497 end);
498 end = stpcpy (end,
499 ", '");
500 end = GNUNET_STRINGS_data_to_string (extra,
501 extra_size,
502 end,
503 sizeof (sql) - (end - sql) - 1);
504 GNUNET_assert (NULL != end);
505 *end = '\0';
506 end = stpcpy (end,
507 "'");
508 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
509 "Executing command `%s'\n",
510 sql);
511 result = PQexec (db->conn,
512 sql);
513 if (PGRES_COMMAND_OK != PQresultStatus (result))
514 {
515 GNUNET_log_from (GNUNET_ERROR_TYPE_ERROR,
516 "pq",
517 "Failed to execute `%s': %s/%s/%s/%s/%s",
518 sql,
519 PQresultErrorField (result,
520 PG_DIAG_MESSAGE_PRIMARY),
521 PQresultErrorField (result,
522 PG_DIAG_MESSAGE_DETAIL),
523 PQresultErrorMessage (result),
524 PQresStatus (PQresultStatus (result)),
525 PQerrorMessage (db->conn));
526 }
527 PQclear (result);
528 event_do_poll (db);
529}
530
531
532/* end of pq_event.c */