aboutsummaryrefslogtreecommitdiff
path: root/src/pq/pq_event.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-08-22 23:59:54 +0200
committerChristian Grothoff <christian@grothoff.org>2021-08-22 23:59:54 +0200
commitea901fb4978ee7e9cfd2f74c810f2146bdf9d46b (patch)
treef2bbad92955ed144bc6afa9a03ece372c20a874a /src/pq/pq_event.c
parent9ef7f0704fa0458f2e27ba188aec5102dbb780b2 (diff)
downloadgnunet-ea901fb4978ee7e9cfd2f74c810f2146bdf9d46b.tar.gz
gnunet-ea901fb4978ee7e9cfd2f74c810f2146bdf9d46b.zip
-simplify libgnunetpq to only support single-threaded applications that do use the scheudler (when using event API)
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r--src/pq/pq_event.c145
1 files changed, 67 insertions, 78 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
index 2890869a3..3a0bfcde3 100644
--- a/src/pq/pq_event.c
+++ b/src/pq/pq_event.c
@@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler
51 * Database context this event handler is with. 51 * Database context this event handler is with.
52 */ 52 */
53 struct GNUNET_PQ_Context *db; 53 struct GNUNET_PQ_Context *db;
54
55 /**
56 * Task to run on timeout.
57 */
58 struct GNUNET_SCHEDULER_Task *timeout_task;
54}; 59};
55 60
56 61
@@ -163,35 +168,10 @@ do_notify (void *cls,
163 168
164 169
165void 170void
166GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
167 GNUNET_PQ_SocketCallback sc,
168 void *sc_cls)
169{
170 int fd;
171
172 db->sc = sc;
173 db->sc_cls = sc_cls;
174 if (NULL == sc)
175 return;
176 GNUNET_assert (0 ==
177 pthread_mutex_lock (&db->notify_lock));
178 fd = PQsocket (db->conn);
179 if ( (-1 != fd) &&
180 (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
181 sc (sc_cls,
182 fd);
183 GNUNET_assert (0 ==
184 pthread_mutex_unlock (&db->notify_lock));
185}
186
187
188void
189GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) 171GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
190{ 172{
191 PGnotify *n; 173 PGnotify *n;
192 174
193 GNUNET_assert (0 ==
194 pthread_mutex_lock (&db->notify_lock));
195 if (1 != 175 if (1 !=
196 PQconsumeInput (db->conn)) 176 PQconsumeInput (db->conn))
197 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 177 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
204 .extra = NULL 184 .extra = NULL
205 }; 185 };
206 186
187 if ('X' != toupper ((int) n->relname[0]))
188 {
189 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
190 "Ignoring notification for unsupported channel identifier `%s'\n",
191 n->relname);
192 PQfreemem (n);
193 continue;
194 }
207 if (GNUNET_OK != 195 if (GNUNET_OK !=
208 GNUNET_STRINGS_string_to_data (n->relname, 196 GNUNET_STRINGS_string_to_data (&n->relname[1],
209 strlen (n->relname), 197 strlen (&n->relname[1]),
210 &sh, 198 &sh,
211 sizeof (sh))) 199 sizeof (sh)))
212 { 200 {
@@ -236,24 +224,10 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
236 GNUNET_free (ctx.extra); 224 GNUNET_free (ctx.extra);
237 PQfreemem (n); 225 PQfreemem (n);
238 } 226 }
239 GNUNET_assert (0 ==
240 pthread_mutex_unlock (&db->notify_lock));
241} 227}
242 228
243 229
244/** 230/**
245 * Function called when the Postgres FD changes and we need
246 * to update the scheduler event loop task.
247 *
248 * @param cls a `struct GNUNET_PQ_Context *`
249 * @param fd the file descriptor, possibly -1
250 */
251static void
252scheduler_fd_cb (void *cls,
253 int fd);
254
255
256/**
257 * The GNUnet scheduler notifies us that we need to 231 * The GNUnet scheduler notifies us that we need to
258 * trigger the DB event poller. 232 * trigger the DB event poller.
259 * 233 *
@@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls,
308 282
309 283
310void 284void
311GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db) 285GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db)
312{ 286{
313 int fd; 287 if (db->scheduler_on)
314 288 return;
315 GNUNET_assert (! db->scheduler_on);
316 GNUNET_assert (NULL == db->sc);
317 db->scheduler_on = true; 289 db->scheduler_on = true;
318 db->sc = &scheduler_fd_cb;
319 db->sc_cls = db;
320 fd = PQsocket (db->conn);
321 scheduler_fd_cb (db, 290 scheduler_fd_cb (db,
322 fd); 291 PQsocket (db->conn));
323} 292}
324 293
325 294
326void 295void
327GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) 296GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db)
328{ 297{
329 GNUNET_assert (db->scheduler_on); 298 GNUNET_assert (db->scheduler_on);
330 GNUNET_free (db->rfd); 299 GNUNET_free (db->rfd);
331 db->sc = NULL;
332 db->scheduler_on = false; 300 db->scheduler_on = false;
333 if (NULL != db->event_task) 301 if (NULL != db->event_task)
334 { 302 {
@@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db)
338} 306}
339 307
340 308
309/**
310 * Helper function to trigger an SQL @a cmd on @a db
311 *
312 * @param db database to send command to
313 * @param cmd prefix of the command to send
314 * @param eh details about the event
315 */
341static void 316static void
342manage_subscribe (struct GNUNET_PQ_Context *db, 317manage_subscribe (struct GNUNET_PQ_Context *db,
343 const char *cmd, 318 const char *cmd,
@@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db,
351 cmd); 326 cmd);
352 end = sh_to_channel (&eh->sh, 327 end = sh_to_channel (&eh->sh,
353 end); 328 end);
329 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
330 "Executing PQ command `%s'\n",
331 sql);
354 result = PQexec (db->conn, 332 result = PQexec (db->conn,
355 sql); 333 sql);
356 if (PGRES_COMMAND_OK != PQresultStatus (result)) 334 if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -395,21 +373,41 @@ register_notify (void *cls,
395 373
396 374
397void 375void
398GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db) 376GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
377 int fd)
399{ 378{
400 GNUNET_assert (0 == 379 if (! db->scheduler_on)
401 pthread_mutex_lock (&db->notify_lock)); 380 return;
381 scheduler_fd_cb (db,
382 fd);
402 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, 383 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
403 &register_notify, 384 &register_notify,
404 db); 385 db);
405 GNUNET_assert (0 == 386}
406 pthread_mutex_unlock (&db->notify_lock)); 387
388
389/**
390 * Function run on timeout for an event. Triggers
391 * the notification, but does NOT clear the handler.
392 *
393 * @param cls a `struct GNUNET_DB_EventHandler *`
394 */
395static void
396event_timeout (void *cls)
397{
398 struct GNUNET_DB_EventHandler *eh = cls;
399
400 eh->timeout_task = NULL;
401 eh->cb (eh->cb_cls,
402 NULL,
403 0);
407} 404}
408 405
409 406
410struct GNUNET_DB_EventHandler * 407struct GNUNET_DB_EventHandler *
411GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, 408GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
412 const struct GNUNET_DB_EventHeaderP *es, 409 const struct GNUNET_DB_EventHeaderP *es,
410 struct GNUNET_TIME_Relative timeout,
413 GNUNET_DB_EventCallback cb, 411 GNUNET_DB_EventCallback cb,
414 void *cb_cls) 412 void *cb_cls)
415{ 413{
@@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
422 &eh->sh); 420 &eh->sh);
423 eh->cb = cb; 421 eh->cb = cb;
424 eh->cb_cls = cb_cls; 422 eh->cb_cls = cb_cls;
425 GNUNET_assert (0 ==
426 pthread_mutex_lock (&db->notify_lock));
427 was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); 423 was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
428 GNUNET_assert (GNUNET_OK == 424 GNUNET_assert (GNUNET_OK ==
429 GNUNET_CONTAINER_multishortmap_put (db->channel_map, 425 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
430 &eh->sh, 426 &eh->sh,
431 eh, 427 eh,
432 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 428 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
433 if ( (NULL != db->sc) && 429 if (was_zero)
434 was_zero) 430 GNUNET_PQ_event_scheduler_start_ (db);
435 {
436 int fd = PQsocket (db->conn);
437
438 if (-1 != fd)
439 db->sc (db->sc_cls,
440 fd);
441 }
442 manage_subscribe (db, 431 manage_subscribe (db,
443 "LISTEN X", 432 "LISTEN X",
444 eh); 433 eh);
445 GNUNET_assert (0 == 434 eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
446 pthread_mutex_unlock (&db->notify_lock)); 435 &event_timeout,
436 eh);
447 return eh; 437 return eh;
448} 438}
449 439
@@ -453,8 +443,6 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
453{ 443{
454 struct GNUNET_PQ_Context *db = eh->db; 444 struct GNUNET_PQ_Context *db = eh->db;
455 445
456 GNUNET_assert (0 ==
457 pthread_mutex_lock (&db->notify_lock));
458 GNUNET_assert (GNUNET_OK == 446 GNUNET_assert (GNUNET_OK ==
459 GNUNET_CONTAINER_multishortmap_remove (db->channel_map, 447 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
460 &eh->sh, 448 &eh->sh,
@@ -463,14 +451,15 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
463 manage_subscribe (db, 451 manage_subscribe (db,
464 "UNLISTEN X", 452 "UNLISTEN X",
465 eh); 453 eh);
466 if ( (NULL != db->sc) && 454 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
467 (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) 455 {
456 GNUNET_PQ_event_scheduler_stop_ (db);
457 }
458 if (NULL != eh->timeout_task)
468 { 459 {
469 db->sc (db->sc_cls, 460 GNUNET_SCHEDULER_cancel (eh->timeout_task);
470 -1); 461 eh->timeout_task = NULL;
471 } 462 }
472 GNUNET_assert (0 ==
473 pthread_mutex_unlock (&db->notify_lock));
474 GNUNET_free (eh); 463 GNUNET_free (eh);
475} 464}
476 465