diff options
author | Christian Grothoff <christian@grothoff.org> | 2021-08-25 12:52:12 +0200 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2021-08-25 12:52:12 +0200 |
commit | 45532e7bc19a2a73ad6da6502969c536d6831436 (patch) | |
tree | ea7dea0ae4bbe406b12903c7fca2f4af9eb05d0f /src | |
parent | 47d691ee2dcba940a9de6b95866c2da9c579ce94 (diff) | |
download | gnunet-45532e7bc19a2a73ad6da6502969c536d6831436.tar.gz gnunet-45532e7bc19a2a73ad6da6502969c536d6831436.zip |
further simplify libgnunetpq event API and implementation, also trigger events on 'loopback' that are not socket-activated
Diffstat (limited to 'src')
-rw-r--r-- | src/include/gnunet_pq_lib.h | 13 | ||||
-rw-r--r-- | src/pq/pq.h | 23 | ||||
-rw-r--r-- | src/pq/pq_event.c | 109 |
3 files changed, 65 insertions, 80 deletions
diff --git a/src/include/gnunet_pq_lib.h b/src/include/gnunet_pq_lib.h index ff4498938..fe3fabbea 100644 --- a/src/include/gnunet_pq_lib.h +++ b/src/include/gnunet_pq_lib.h | |||
@@ -853,19 +853,6 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db); | |||
853 | 853 | ||
854 | 854 | ||
855 | /** | 855 | /** |
856 | * Poll for database events now. Used if the event FD | ||
857 | * is ready and the application wants to trigger applicable | ||
858 | * events. | ||
859 | * Useful if the GNUnet scheduler is NOT to be used for | ||
860 | * such notifications. | ||
861 | * | ||
862 | * @param db database handle | ||
863 | */ | ||
864 | void | ||
865 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db); | ||
866 | |||
867 | |||
868 | /** | ||
869 | * Register callback to be invoked on events of type @a es. | 856 | * Register callback to be invoked on events of type @a es. |
870 | * | 857 | * |
871 | * Unlike many other calls, this function is thread-safe | 858 | * Unlike many other calls, this function is thread-safe |
diff --git a/src/pq/pq.h b/src/pq/pq.h index 950d38220..d10931d99 100644 --- a/src/pq/pq.h +++ b/src/pq/pq.h | |||
@@ -73,11 +73,6 @@ struct GNUNET_PQ_Context | |||
73 | * File descriptor wrapper for @e event_task. | 73 | * File descriptor wrapper for @e event_task. |
74 | */ | 74 | */ |
75 | struct GNUNET_NETWORK_Handle *rfd; | 75 | struct GNUNET_NETWORK_Handle *rfd; |
76 | |||
77 | /** | ||
78 | * Is scheduling via the GNUnet scheduler desired? | ||
79 | */ | ||
80 | bool scheduler_on; | ||
81 | }; | 76 | }; |
82 | 77 | ||
83 | 78 | ||
@@ -93,22 +88,4 @@ GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, | |||
93 | int fd); | 88 | int fd); |
94 | 89 | ||
95 | 90 | ||
96 | /** | ||
97 | * Run poll event loop using the GNUnet scheduler. | ||
98 | * | ||
99 | * @param db database handle | ||
100 | */ | ||
101 | void | ||
102 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db); | ||
103 | |||
104 | |||
105 | /** | ||
106 | * Stop running poll event loop using the GNUnet scheduler. | ||
107 | * | ||
108 | * @param db database handle | ||
109 | */ | ||
110 | void | ||
111 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db); | ||
112 | |||
113 | |||
114 | #endif | 91 | #endif |
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c index 3a0bfcde3..79a2e80c6 100644 --- a/src/pq/pq_event.c +++ b/src/pq/pq_event.c | |||
@@ -106,6 +106,24 @@ sh_to_channel (struct GNUNET_ShortHashCode *sh, | |||
106 | 106 | ||
107 | 107 | ||
108 | /** | 108 | /** |
109 | * Convert @a sh to a Postgres identifier. | ||
110 | * | ||
111 | * @param identifier to convert | ||
112 | * @param[out] sh set to short hash | ||
113 | * @return #GNUNET_OK on success | ||
114 | */ | ||
115 | static enum GNUNET_GenericReturnValue | ||
116 | channel_to_sh (const char *identifier, | ||
117 | struct GNUNET_ShortHashCode *sh) | ||
118 | { | ||
119 | return GNUNET_STRINGS_string_to_data (identifier, | ||
120 | strlen (identifier), | ||
121 | sh, | ||
122 | sizeof (*sh)); | ||
123 | } | ||
124 | |||
125 | |||
126 | /** | ||
109 | * Convert @a es to a Postgres identifier. | 127 | * Convert @a es to a Postgres identifier. |
110 | * | 128 | * |
111 | * @param es spec to hash to an identifier | 129 | * @param es spec to hash to an identifier |
@@ -167,11 +185,13 @@ do_notify (void *cls, | |||
167 | } | 185 | } |
168 | 186 | ||
169 | 187 | ||
170 | void | 188 | static void |
171 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | 189 | event_do_poll (struct GNUNET_PQ_Context *db) |
172 | { | 190 | { |
173 | PGnotify *n; | 191 | PGnotify *n; |
174 | 192 | ||
193 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
194 | "PG poll job active\n"); | ||
175 | if (1 != | 195 | if (1 != |
176 | PQconsumeInput (db->conn)) | 196 | PQconsumeInput (db->conn)) |
177 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 197 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -193,10 +213,8 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
193 | continue; | 213 | continue; |
194 | } | 214 | } |
195 | if (GNUNET_OK != | 215 | if (GNUNET_OK != |
196 | GNUNET_STRINGS_string_to_data (&n->relname[1], | 216 | channel_to_sh (&n->relname[1], |
197 | strlen (&n->relname[1]), | 217 | &sh)) |
198 | &sh, | ||
199 | sizeof (sh))) | ||
200 | { | 218 | { |
201 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 219 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
202 | "Ignoring notification for unsupported channel identifier `%s'\n", | 220 | "Ignoring notification for unsupported channel identifier `%s'\n", |
@@ -218,9 +236,15 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
218 | PQfreemem (n); | 236 | PQfreemem (n); |
219 | continue; | 237 | continue; |
220 | } | 238 | } |
221 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, | 239 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
222 | &do_notify, | 240 | "Received notification %s with extra data `%.*s'\n", |
223 | &ctx); | 241 | n->relname, |
242 | (int) ctx.extra_size, | ||
243 | (const char *) ctx.extra); | ||
244 | GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map, | ||
245 | &sh, | ||
246 | &do_notify, | ||
247 | &ctx); | ||
224 | GNUNET_free (ctx.extra); | 248 | GNUNET_free (ctx.extra); |
225 | PQfreemem (n); | 249 | PQfreemem (n); |
226 | } | 250 | } |
@@ -238,9 +262,11 @@ do_scheduler_notify (void *cls) | |||
238 | { | 262 | { |
239 | struct GNUNET_PQ_Context *db = cls; | 263 | struct GNUNET_PQ_Context *db = cls; |
240 | 264 | ||
241 | GNUNET_assert (db->scheduler_on); | 265 | db->event_task = NULL; |
242 | GNUNET_assert (NULL != db->rfd); | 266 | GNUNET_assert (NULL != db->rfd); |
243 | GNUNET_PQ_event_do_poll (db); | 267 | event_do_poll (db); |
268 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
269 | "Resubscribing\n"); | ||
244 | db->event_task | 270 | db->event_task |
245 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | 271 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, |
246 | db->rfd, | 272 | db->rfd, |
@@ -262,6 +288,9 @@ scheduler_fd_cb (void *cls, | |||
262 | { | 288 | { |
263 | struct GNUNET_PQ_Context *db = cls; | 289 | struct GNUNET_PQ_Context *db = cls; |
264 | 290 | ||
291 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
292 | "New poll FD is %d\n", | ||
293 | fd); | ||
265 | if (NULL != db->event_task) | 294 | if (NULL != db->event_task) |
266 | { | 295 | { |
267 | GNUNET_SCHEDULER_cancel (db->event_task); | 296 | GNUNET_SCHEDULER_cancel (db->event_task); |
@@ -272,6 +301,9 @@ scheduler_fd_cb (void *cls, | |||
272 | return; | 301 | return; |
273 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) | 302 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) |
274 | return; | 303 | return; |
304 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
305 | "Activating poll job on %d\n", | ||
306 | fd); | ||
275 | db->rfd = GNUNET_NETWORK_socket_box_native (fd); | 307 | db->rfd = GNUNET_NETWORK_socket_box_native (fd); |
276 | db->event_task | 308 | db->event_task |
277 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO, | 309 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO, |
@@ -281,31 +313,6 @@ scheduler_fd_cb (void *cls, | |||
281 | } | 313 | } |
282 | 314 | ||
283 | 315 | ||
284 | void | ||
285 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db) | ||
286 | { | ||
287 | if (db->scheduler_on) | ||
288 | return; | ||
289 | db->scheduler_on = true; | ||
290 | scheduler_fd_cb (db, | ||
291 | PQsocket (db->conn)); | ||
292 | } | ||
293 | |||
294 | |||
295 | void | ||
296 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db) | ||
297 | { | ||
298 | GNUNET_assert (db->scheduler_on); | ||
299 | GNUNET_free (db->rfd); | ||
300 | db->scheduler_on = false; | ||
301 | if (NULL != db->event_task) | ||
302 | { | ||
303 | GNUNET_SCHEDULER_cancel (db->event_task); | ||
304 | db->event_task = NULL; | ||
305 | } | ||
306 | } | ||
307 | |||
308 | |||
309 | /** | 316 | /** |
310 | * Helper function to trigger an SQL @a cmd on @a db | 317 | * Helper function to trigger an SQL @a cmd on @a db |
311 | * | 318 | * |
@@ -376,8 +383,9 @@ void | |||
376 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, | 383 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, |
377 | int fd) | 384 | int fd) |
378 | { | 385 | { |
379 | if (! db->scheduler_on) | 386 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
380 | return; | 387 | "Change in PQ event FD to %d\n", |
388 | fd); | ||
381 | scheduler_fd_cb (db, | 389 | scheduler_fd_cb (db, |
382 | fd); | 390 | fd); |
383 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, | 391 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, |
@@ -412,7 +420,6 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | |||
412 | void *cb_cls) | 420 | void *cb_cls) |
413 | { | 421 | { |
414 | struct GNUNET_DB_EventHandler *eh; | 422 | struct GNUNET_DB_EventHandler *eh; |
415 | bool was_zero; | ||
416 | 423 | ||
417 | eh = GNUNET_new (struct GNUNET_DB_EventHandler); | 424 | eh = GNUNET_new (struct GNUNET_DB_EventHandler); |
418 | eh->db = db; | 425 | eh->db = db; |
@@ -420,14 +427,18 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | |||
420 | &eh->sh); | 427 | &eh->sh); |
421 | eh->cb = cb; | 428 | eh->cb = cb; |
422 | eh->cb_cls = cb_cls; | 429 | eh->cb_cls = cb_cls; |
423 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); | ||
424 | GNUNET_assert (GNUNET_OK == | 430 | GNUNET_assert (GNUNET_OK == |
425 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, | 431 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, |
426 | &eh->sh, | 432 | &eh->sh, |
427 | eh, | 433 | eh, |
428 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 434 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
429 | if (was_zero) | 435 | if (NULL == db->event_task) |
430 | GNUNET_PQ_event_scheduler_start_ (db); | 436 | { |
437 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
438 | "Starting event scheduler\n"); | ||
439 | scheduler_fd_cb (db, | ||
440 | PQsocket (db->conn)); | ||
441 | } | ||
431 | manage_subscribe (db, | 442 | manage_subscribe (db, |
432 | "LISTEN X", | 443 | "LISTEN X", |
433 | eh); | 444 | eh); |
@@ -447,13 +458,19 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) | |||
447 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, | 458 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, |
448 | &eh->sh, | 459 | &eh->sh, |
449 | eh)); | 460 | eh)); |
450 | |||
451 | manage_subscribe (db, | 461 | manage_subscribe (db, |
452 | "UNLISTEN X", | 462 | "UNLISTEN X", |
453 | eh); | 463 | eh); |
454 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) | 464 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) |
455 | { | 465 | { |
456 | GNUNET_PQ_event_scheduler_stop_ (db); | 466 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
467 | "Stopping PQ event scheduler job\n"); | ||
468 | GNUNET_free (db->rfd); | ||
469 | if (NULL != db->event_task) | ||
470 | { | ||
471 | GNUNET_SCHEDULER_cancel (db->event_task); | ||
472 | db->event_task = NULL; | ||
473 | } | ||
457 | } | 474 | } |
458 | if (NULL != eh->timeout_task) | 475 | if (NULL != eh->timeout_task) |
459 | { | 476 | { |
@@ -488,6 +505,9 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, | |||
488 | *end = '\0'; | 505 | *end = '\0'; |
489 | end = stpcpy (end, | 506 | end = stpcpy (end, |
490 | "'"); | 507 | "'"); |
508 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
509 | "Executing command `%s'\n", | ||
510 | sql); | ||
491 | result = PQexec (db->conn, | 511 | result = PQexec (db->conn, |
492 | sql); | 512 | sql); |
493 | if (PGRES_COMMAND_OK != PQresultStatus (result)) | 513 | if (PGRES_COMMAND_OK != PQresultStatus (result)) |
@@ -505,6 +525,7 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, | |||
505 | PQerrorMessage (db->conn)); | 525 | PQerrorMessage (db->conn)); |
506 | } | 526 | } |
507 | PQclear (result); | 527 | PQclear (result); |
528 | event_do_poll (db); | ||
508 | } | 529 | } |
509 | 530 | ||
510 | 531 | ||