aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-08-25 12:52:12 +0200
committerChristian Grothoff <christian@grothoff.org>2021-08-25 12:52:12 +0200
commit45532e7bc19a2a73ad6da6502969c536d6831436 (patch)
treeea7dea0ae4bbe406b12903c7fca2f4af9eb05d0f
parent47d691ee2dcba940a9de6b95866c2da9c579ce94 (diff)
downloadgnunet-45532e7bc19a2a73ad6da6502969c536d6831436.tar.gz
gnunet-45532e7bc19a2a73ad6da6502969c536d6831436.zip
further simplify libgnunetpq event API and implementation, also trigger events on 'loopback' that are not socket-activated
-rw-r--r--src/include/gnunet_pq_lib.h13
-rw-r--r--src/pq/pq.h23
-rw-r--r--src/pq/pq_event.c109
3 files changed, 65 insertions, 80 deletions
diff --git a/src/include/gnunet_pq_lib.h b/src/include/gnunet_pq_lib.h
index ff4498938..fe3fabbea 100644
--- a/src/include/gnunet_pq_lib.h
+++ b/src/include/gnunet_pq_lib.h
@@ -853,19 +853,6 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db);
853 853
854 854
855/** 855/**
856 * Poll for database events now. Used if the event FD
857 * is ready and the application wants to trigger applicable
858 * events.
859 * Useful if the GNUnet scheduler is NOT to be used for
860 * such notifications.
861 *
862 * @param db database handle
863 */
864void
865GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db);
866
867
868/**
869 * Register callback to be invoked on events of type @a es. 856 * Register callback to be invoked on events of type @a es.
870 * 857 *
871 * Unlike many other calls, this function is thread-safe 858 * Unlike many other calls, this function is thread-safe
diff --git a/src/pq/pq.h b/src/pq/pq.h
index 950d38220..d10931d99 100644
--- a/src/pq/pq.h
+++ b/src/pq/pq.h
@@ -73,11 +73,6 @@ struct GNUNET_PQ_Context
73 * File descriptor wrapper for @e event_task. 73 * File descriptor wrapper for @e event_task.
74 */ 74 */
75 struct GNUNET_NETWORK_Handle *rfd; 75 struct GNUNET_NETWORK_Handle *rfd;
76
77 /**
78 * Is scheduling via the GNUnet scheduler desired?
79 */
80 bool scheduler_on;
81}; 76};
82 77
83 78
@@ -93,22 +88,4 @@ GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
93 int fd); 88 int fd);
94 89
95 90
96/**
97 * Run poll event loop using the GNUnet scheduler.
98 *
99 * @param db database handle
100 */
101void
102GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db);
103
104
105/**
106 * Stop running poll event loop using the GNUnet scheduler.
107 *
108 * @param db database handle
109 */
110void
111GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db);
112
113
114#endif 91#endif
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
index 3a0bfcde3..79a2e80c6 100644
--- a/src/pq/pq_event.c
+++ b/src/pq/pq_event.c
@@ -106,6 +106,24 @@ sh_to_channel (struct GNUNET_ShortHashCode *sh,
106 106
107 107
108/** 108/**
109 * Convert @a sh to a Postgres identifier.
110 *
111 * @param identifier to convert
112 * @param[out] sh set to short hash
113 * @return #GNUNET_OK on success
114 */
115static enum GNUNET_GenericReturnValue
116channel_to_sh (const char *identifier,
117 struct GNUNET_ShortHashCode *sh)
118{
119 return GNUNET_STRINGS_string_to_data (identifier,
120 strlen (identifier),
121 sh,
122 sizeof (*sh));
123}
124
125
126/**
109 * Convert @a es to a Postgres identifier. 127 * Convert @a es to a Postgres identifier.
110 * 128 *
111 * @param es spec to hash to an identifier 129 * @param es spec to hash to an identifier
@@ -167,11 +185,13 @@ do_notify (void *cls,
167} 185}
168 186
169 187
170void 188static void
171GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) 189event_do_poll (struct GNUNET_PQ_Context *db)
172{ 190{
173 PGnotify *n; 191 PGnotify *n;
174 192
193 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
194 "PG poll job active\n");
175 if (1 != 195 if (1 !=
176 PQconsumeInput (db->conn)) 196 PQconsumeInput (db->conn))
177 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 197 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -193,10 +213,8 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
193 continue; 213 continue;
194 } 214 }
195 if (GNUNET_OK != 215 if (GNUNET_OK !=
196 GNUNET_STRINGS_string_to_data (&n->relname[1], 216 channel_to_sh (&n->relname[1],
197 strlen (&n->relname[1]), 217 &sh))
198 &sh,
199 sizeof (sh)))
200 { 218 {
201 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 219 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
202 "Ignoring notification for unsupported channel identifier `%s'\n", 220 "Ignoring notification for unsupported channel identifier `%s'\n",
@@ -218,9 +236,15 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
218 PQfreemem (n); 236 PQfreemem (n);
219 continue; 237 continue;
220 } 238 }
221 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, 239 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
222 &do_notify, 240 "Received notification %s with extra data `%.*s'\n",
223 &ctx); 241 n->relname,
242 (int) ctx.extra_size,
243 (const char *) ctx.extra);
244 GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map,
245 &sh,
246 &do_notify,
247 &ctx);
224 GNUNET_free (ctx.extra); 248 GNUNET_free (ctx.extra);
225 PQfreemem (n); 249 PQfreemem (n);
226 } 250 }
@@ -238,9 +262,11 @@ do_scheduler_notify (void *cls)
238{ 262{
239 struct GNUNET_PQ_Context *db = cls; 263 struct GNUNET_PQ_Context *db = cls;
240 264
241 GNUNET_assert (db->scheduler_on); 265 db->event_task = NULL;
242 GNUNET_assert (NULL != db->rfd); 266 GNUNET_assert (NULL != db->rfd);
243 GNUNET_PQ_event_do_poll (db); 267 event_do_poll (db);
268 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
269 "Resubscribing\n");
244 db->event_task 270 db->event_task
245 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, 271 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
246 db->rfd, 272 db->rfd,
@@ -262,6 +288,9 @@ scheduler_fd_cb (void *cls,
262{ 288{
263 struct GNUNET_PQ_Context *db = cls; 289 struct GNUNET_PQ_Context *db = cls;
264 290
291 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
292 "New poll FD is %d\n",
293 fd);
265 if (NULL != db->event_task) 294 if (NULL != db->event_task)
266 { 295 {
267 GNUNET_SCHEDULER_cancel (db->event_task); 296 GNUNET_SCHEDULER_cancel (db->event_task);
@@ -272,6 +301,9 @@ scheduler_fd_cb (void *cls,
272 return; 301 return;
273 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) 302 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
274 return; 303 return;
304 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
305 "Activating poll job on %d\n",
306 fd);
275 db->rfd = GNUNET_NETWORK_socket_box_native (fd); 307 db->rfd = GNUNET_NETWORK_socket_box_native (fd);
276 db->event_task 308 db->event_task
277 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO, 309 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO,
@@ -281,31 +313,6 @@ scheduler_fd_cb (void *cls,
281} 313}
282 314
283 315
284void
285GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db)
286{
287 if (db->scheduler_on)
288 return;
289 db->scheduler_on = true;
290 scheduler_fd_cb (db,
291 PQsocket (db->conn));
292}
293
294
295void
296GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db)
297{
298 GNUNET_assert (db->scheduler_on);
299 GNUNET_free (db->rfd);
300 db->scheduler_on = false;
301 if (NULL != db->event_task)
302 {
303 GNUNET_SCHEDULER_cancel (db->event_task);
304 db->event_task = NULL;
305 }
306}
307
308
309/** 316/**
310 * Helper function to trigger an SQL @a cmd on @a db 317 * Helper function to trigger an SQL @a cmd on @a db
311 * 318 *
@@ -376,8 +383,9 @@ void
376GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, 383GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
377 int fd) 384 int fd)
378{ 385{
379 if (! db->scheduler_on) 386 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
380 return; 387 "Change in PQ event FD to %d\n",
388 fd);
381 scheduler_fd_cb (db, 389 scheduler_fd_cb (db,
382 fd); 390 fd);
383 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, 391 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
@@ -412,7 +420,6 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
412 void *cb_cls) 420 void *cb_cls)
413{ 421{
414 struct GNUNET_DB_EventHandler *eh; 422 struct GNUNET_DB_EventHandler *eh;
415 bool was_zero;
416 423
417 eh = GNUNET_new (struct GNUNET_DB_EventHandler); 424 eh = GNUNET_new (struct GNUNET_DB_EventHandler);
418 eh->db = db; 425 eh->db = db;
@@ -420,14 +427,18 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
420 &eh->sh); 427 &eh->sh);
421 eh->cb = cb; 428 eh->cb = cb;
422 eh->cb_cls = cb_cls; 429 eh->cb_cls = cb_cls;
423 was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
424 GNUNET_assert (GNUNET_OK == 430 GNUNET_assert (GNUNET_OK ==
425 GNUNET_CONTAINER_multishortmap_put (db->channel_map, 431 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
426 &eh->sh, 432 &eh->sh,
427 eh, 433 eh,
428 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 434 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
429 if (was_zero) 435 if (NULL == db->event_task)
430 GNUNET_PQ_event_scheduler_start_ (db); 436 {
437 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
438 "Starting event scheduler\n");
439 scheduler_fd_cb (db,
440 PQsocket (db->conn));
441 }
431 manage_subscribe (db, 442 manage_subscribe (db,
432 "LISTEN X", 443 "LISTEN X",
433 eh); 444 eh);
@@ -447,13 +458,19 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
447 GNUNET_CONTAINER_multishortmap_remove (db->channel_map, 458 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
448 &eh->sh, 459 &eh->sh,
449 eh)); 460 eh));
450
451 manage_subscribe (db, 461 manage_subscribe (db,
452 "UNLISTEN X", 462 "UNLISTEN X",
453 eh); 463 eh);
454 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) 464 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
455 { 465 {
456 GNUNET_PQ_event_scheduler_stop_ (db); 466 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
467 "Stopping PQ event scheduler job\n");
468 GNUNET_free (db->rfd);
469 if (NULL != db->event_task)
470 {
471 GNUNET_SCHEDULER_cancel (db->event_task);
472 db->event_task = NULL;
473 }
457 } 474 }
458 if (NULL != eh->timeout_task) 475 if (NULL != eh->timeout_task)
459 { 476 {
@@ -488,6 +505,9 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
488 *end = '\0'; 505 *end = '\0';
489 end = stpcpy (end, 506 end = stpcpy (end,
490 "'"); 507 "'");
508 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
509 "Executing command `%s'\n",
510 sql);
491 result = PQexec (db->conn, 511 result = PQexec (db->conn,
492 sql); 512 sql);
493 if (PGRES_COMMAND_OK != PQresultStatus (result)) 513 if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -505,6 +525,7 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
505 PQerrorMessage (db->conn)); 525 PQerrorMessage (db->conn));
506 } 526 }
507 PQclear (result); 527 PQclear (result);
528 event_do_poll (db);
508} 529}
509 530
510 531