diff options
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r-- | src/pq/pq_event.c | 145 |
1 files changed, 67 insertions, 78 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c index 2890869a3..3a0bfcde3 100644 --- a/src/pq/pq_event.c +++ b/src/pq/pq_event.c | |||
@@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler | |||
51 | * Database context this event handler is with. | 51 | * Database context this event handler is with. |
52 | */ | 52 | */ |
53 | struct GNUNET_PQ_Context *db; | 53 | struct GNUNET_PQ_Context *db; |
54 | |||
55 | /** | ||
56 | * Task to run on timeout. | ||
57 | */ | ||
58 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
54 | }; | 59 | }; |
55 | 60 | ||
56 | 61 | ||
@@ -163,35 +168,10 @@ do_notify (void *cls, | |||
163 | 168 | ||
164 | 169 | ||
165 | void | 170 | void |
166 | GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db, | ||
167 | GNUNET_PQ_SocketCallback sc, | ||
168 | void *sc_cls) | ||
169 | { | ||
170 | int fd; | ||
171 | |||
172 | db->sc = sc; | ||
173 | db->sc_cls = sc_cls; | ||
174 | if (NULL == sc) | ||
175 | return; | ||
176 | GNUNET_assert (0 == | ||
177 | pthread_mutex_lock (&db->notify_lock)); | ||
178 | fd = PQsocket (db->conn); | ||
179 | if ( (-1 != fd) && | ||
180 | (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | ||
181 | sc (sc_cls, | ||
182 | fd); | ||
183 | GNUNET_assert (0 == | ||
184 | pthread_mutex_unlock (&db->notify_lock)); | ||
185 | } | ||
186 | |||
187 | |||
188 | void | ||
189 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | 171 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) |
190 | { | 172 | { |
191 | PGnotify *n; | 173 | PGnotify *n; |
192 | 174 | ||
193 | GNUNET_assert (0 == | ||
194 | pthread_mutex_lock (&db->notify_lock)); | ||
195 | if (1 != | 175 | if (1 != |
196 | PQconsumeInput (db->conn)) | 176 | PQconsumeInput (db->conn)) |
197 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 177 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
204 | .extra = NULL | 184 | .extra = NULL |
205 | }; | 185 | }; |
206 | 186 | ||
187 | if ('X' != toupper ((int) n->relname[0])) | ||
188 | { | ||
189 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
190 | "Ignoring notification for unsupported channel identifier `%s'\n", | ||
191 | n->relname); | ||
192 | PQfreemem (n); | ||
193 | continue; | ||
194 | } | ||
207 | if (GNUNET_OK != | 195 | if (GNUNET_OK != |
208 | GNUNET_STRINGS_string_to_data (n->relname, | 196 | GNUNET_STRINGS_string_to_data (&n->relname[1], |
209 | strlen (n->relname), | 197 | strlen (&n->relname[1]), |
210 | &sh, | 198 | &sh, |
211 | sizeof (sh))) | 199 | sizeof (sh))) |
212 | { | 200 | { |
@@ -236,24 +224,10 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
236 | GNUNET_free (ctx.extra); | 224 | GNUNET_free (ctx.extra); |
237 | PQfreemem (n); | 225 | PQfreemem (n); |
238 | } | 226 | } |
239 | GNUNET_assert (0 == | ||
240 | pthread_mutex_unlock (&db->notify_lock)); | ||
241 | } | 227 | } |
242 | 228 | ||
243 | 229 | ||
244 | /** | 230 | /** |
245 | * Function called when the Postgres FD changes and we need | ||
246 | * to update the scheduler event loop task. | ||
247 | * | ||
248 | * @param cls a `struct GNUNET_PQ_Context *` | ||
249 | * @param fd the file descriptor, possibly -1 | ||
250 | */ | ||
251 | static void | ||
252 | scheduler_fd_cb (void *cls, | ||
253 | int fd); | ||
254 | |||
255 | |||
256 | /** | ||
257 | * The GNUnet scheduler notifies us that we need to | 231 | * The GNUnet scheduler notifies us that we need to |
258 | * trigger the DB event poller. | 232 | * trigger the DB event poller. |
259 | * | 233 | * |
@@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls, | |||
308 | 282 | ||
309 | 283 | ||
310 | void | 284 | void |
311 | GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db) | 285 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db) |
312 | { | 286 | { |
313 | int fd; | 287 | if (db->scheduler_on) |
314 | 288 | return; | |
315 | GNUNET_assert (! db->scheduler_on); | ||
316 | GNUNET_assert (NULL == db->sc); | ||
317 | db->scheduler_on = true; | 289 | db->scheduler_on = true; |
318 | db->sc = &scheduler_fd_cb; | ||
319 | db->sc_cls = db; | ||
320 | fd = PQsocket (db->conn); | ||
321 | scheduler_fd_cb (db, | 290 | scheduler_fd_cb (db, |
322 | fd); | 291 | PQsocket (db->conn)); |
323 | } | 292 | } |
324 | 293 | ||
325 | 294 | ||
326 | void | 295 | void |
327 | GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) | 296 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db) |
328 | { | 297 | { |
329 | GNUNET_assert (db->scheduler_on); | 298 | GNUNET_assert (db->scheduler_on); |
330 | GNUNET_free (db->rfd); | 299 | GNUNET_free (db->rfd); |
331 | db->sc = NULL; | ||
332 | db->scheduler_on = false; | 300 | db->scheduler_on = false; |
333 | if (NULL != db->event_task) | 301 | if (NULL != db->event_task) |
334 | { | 302 | { |
@@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) | |||
338 | } | 306 | } |
339 | 307 | ||
340 | 308 | ||
309 | /** | ||
310 | * Helper function to trigger an SQL @a cmd on @a db | ||
311 | * | ||
312 | * @param db database to send command to | ||
313 | * @param cmd prefix of the command to send | ||
314 | * @param eh details about the event | ||
315 | */ | ||
341 | static void | 316 | static void |
342 | manage_subscribe (struct GNUNET_PQ_Context *db, | 317 | manage_subscribe (struct GNUNET_PQ_Context *db, |
343 | const char *cmd, | 318 | const char *cmd, |
@@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db, | |||
351 | cmd); | 326 | cmd); |
352 | end = sh_to_channel (&eh->sh, | 327 | end = sh_to_channel (&eh->sh, |
353 | end); | 328 | end); |
329 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
330 | "Executing PQ command `%s'\n", | ||
331 | sql); | ||
354 | result = PQexec (db->conn, | 332 | result = PQexec (db->conn, |
355 | sql); | 333 | sql); |
356 | if (PGRES_COMMAND_OK != PQresultStatus (result)) | 334 | if (PGRES_COMMAND_OK != PQresultStatus (result)) |
@@ -395,21 +373,41 @@ register_notify (void *cls, | |||
395 | 373 | ||
396 | 374 | ||
397 | void | 375 | void |
398 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db) | 376 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, |
377 | int fd) | ||
399 | { | 378 | { |
400 | GNUNET_assert (0 == | 379 | if (! db->scheduler_on) |
401 | pthread_mutex_lock (&db->notify_lock)); | 380 | return; |
381 | scheduler_fd_cb (db, | ||
382 | fd); | ||
402 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, | 383 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, |
403 | ®ister_notify, | 384 | ®ister_notify, |
404 | db); | 385 | db); |
405 | GNUNET_assert (0 == | 386 | } |
406 | pthread_mutex_unlock (&db->notify_lock)); | 387 | |
388 | |||
389 | /** | ||
390 | * Function run on timeout for an event. Triggers | ||
391 | * the notification, but does NOT clear the handler. | ||
392 | * | ||
393 | * @param cls a `struct GNUNET_DB_EventHandler *` | ||
394 | */ | ||
395 | static void | ||
396 | event_timeout (void *cls) | ||
397 | { | ||
398 | struct GNUNET_DB_EventHandler *eh = cls; | ||
399 | |||
400 | eh->timeout_task = NULL; | ||
401 | eh->cb (eh->cb_cls, | ||
402 | NULL, | ||
403 | 0); | ||
407 | } | 404 | } |
408 | 405 | ||
409 | 406 | ||
410 | struct GNUNET_DB_EventHandler * | 407 | struct GNUNET_DB_EventHandler * |
411 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | 408 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, |
412 | const struct GNUNET_DB_EventHeaderP *es, | 409 | const struct GNUNET_DB_EventHeaderP *es, |
410 | struct GNUNET_TIME_Relative timeout, | ||
413 | GNUNET_DB_EventCallback cb, | 411 | GNUNET_DB_EventCallback cb, |
414 | void *cb_cls) | 412 | void *cb_cls) |
415 | { | 413 | { |
@@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | |||
422 | &eh->sh); | 420 | &eh->sh); |
423 | eh->cb = cb; | 421 | eh->cb = cb; |
424 | eh->cb_cls = cb_cls; | 422 | eh->cb_cls = cb_cls; |
425 | GNUNET_assert (0 == | ||
426 | pthread_mutex_lock (&db->notify_lock)); | ||
427 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); | 423 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); |
428 | GNUNET_assert (GNUNET_OK == | 424 | GNUNET_assert (GNUNET_OK == |
429 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, | 425 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, |
430 | &eh->sh, | 426 | &eh->sh, |
431 | eh, | 427 | eh, |
432 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 428 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
433 | if ( (NULL != db->sc) && | 429 | if (was_zero) |
434 | was_zero) | 430 | GNUNET_PQ_event_scheduler_start_ (db); |
435 | { | ||
436 | int fd = PQsocket (db->conn); | ||
437 | |||
438 | if (-1 != fd) | ||
439 | db->sc (db->sc_cls, | ||
440 | fd); | ||
441 | } | ||
442 | manage_subscribe (db, | 431 | manage_subscribe (db, |
443 | "LISTEN X", | 432 | "LISTEN X", |
444 | eh); | 433 | eh); |
445 | GNUNET_assert (0 == | 434 | eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, |
446 | pthread_mutex_unlock (&db->notify_lock)); | 435 | &event_timeout, |
436 | eh); | ||
447 | return eh; | 437 | return eh; |
448 | } | 438 | } |
449 | 439 | ||
@@ -453,8 +443,6 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) | |||
453 | { | 443 | { |
454 | struct GNUNET_PQ_Context *db = eh->db; | 444 | struct GNUNET_PQ_Context *db = eh->db; |
455 | 445 | ||
456 | GNUNET_assert (0 == | ||
457 | pthread_mutex_lock (&db->notify_lock)); | ||
458 | GNUNET_assert (GNUNET_OK == | 446 | GNUNET_assert (GNUNET_OK == |
459 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, | 447 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, |
460 | &eh->sh, | 448 | &eh->sh, |
@@ -463,14 +451,15 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) | |||
463 | manage_subscribe (db, | 451 | manage_subscribe (db, |
464 | "UNLISTEN X", | 452 | "UNLISTEN X", |
465 | eh); | 453 | eh); |
466 | if ( (NULL != db->sc) && | 454 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) |
467 | (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | 455 | { |
456 | GNUNET_PQ_event_scheduler_stop_ (db); | ||
457 | } | ||
458 | if (NULL != eh->timeout_task) | ||
468 | { | 459 | { |
469 | db->sc (db->sc_cls, | 460 | GNUNET_SCHEDULER_cancel (eh->timeout_task); |
470 | -1); | 461 | eh->timeout_task = NULL; |
471 | } | 462 | } |
472 | GNUNET_assert (0 == | ||
473 | pthread_mutex_unlock (&db->notify_lock)); | ||
474 | GNUNET_free (eh); | 463 | GNUNET_free (eh); |
475 | } | 464 | } |
476 | 465 | ||