diff options
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r-- | src/pq/pq_event.c | 109 |
1 files changed, 65 insertions, 44 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c index 3a0bfcde3..79a2e80c6 100644 --- a/src/pq/pq_event.c +++ b/src/pq/pq_event.c | |||
@@ -106,6 +106,24 @@ sh_to_channel (struct GNUNET_ShortHashCode *sh, | |||
106 | 106 | ||
107 | 107 | ||
108 | /** | 108 | /** |
109 | * Convert @a sh to a Postgres identifier. | ||
110 | * | ||
111 | * @param identifier to convert | ||
112 | * @param[out] sh set to short hash | ||
113 | * @return #GNUNET_OK on success | ||
114 | */ | ||
115 | static enum GNUNET_GenericReturnValue | ||
116 | channel_to_sh (const char *identifier, | ||
117 | struct GNUNET_ShortHashCode *sh) | ||
118 | { | ||
119 | return GNUNET_STRINGS_string_to_data (identifier, | ||
120 | strlen (identifier), | ||
121 | sh, | ||
122 | sizeof (*sh)); | ||
123 | } | ||
124 | |||
125 | |||
126 | /** | ||
109 | * Convert @a es to a Postgres identifier. | 127 | * Convert @a es to a Postgres identifier. |
110 | * | 128 | * |
111 | * @param es spec to hash to an identifier | 129 | * @param es spec to hash to an identifier |
@@ -167,11 +185,13 @@ do_notify (void *cls, | |||
167 | } | 185 | } |
168 | 186 | ||
169 | 187 | ||
170 | void | 188 | static void |
171 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | 189 | event_do_poll (struct GNUNET_PQ_Context *db) |
172 | { | 190 | { |
173 | PGnotify *n; | 191 | PGnotify *n; |
174 | 192 | ||
193 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
194 | "PG poll job active\n"); | ||
175 | if (1 != | 195 | if (1 != |
176 | PQconsumeInput (db->conn)) | 196 | PQconsumeInput (db->conn)) |
177 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 197 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -193,10 +213,8 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
193 | continue; | 213 | continue; |
194 | } | 214 | } |
195 | if (GNUNET_OK != | 215 | if (GNUNET_OK != |
196 | GNUNET_STRINGS_string_to_data (&n->relname[1], | 216 | channel_to_sh (&n->relname[1], |
197 | strlen (&n->relname[1]), | 217 | &sh)) |
198 | &sh, | ||
199 | sizeof (sh))) | ||
200 | { | 218 | { |
201 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | 219 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, |
202 | "Ignoring notification for unsupported channel identifier `%s'\n", | 220 | "Ignoring notification for unsupported channel identifier `%s'\n", |
@@ -218,9 +236,15 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
218 | PQfreemem (n); | 236 | PQfreemem (n); |
219 | continue; | 237 | continue; |
220 | } | 238 | } |
221 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, | 239 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
222 | &do_notify, | 240 | "Received notification %s with extra data `%.*s'\n", |
223 | &ctx); | 241 | n->relname, |
242 | (int) ctx.extra_size, | ||
243 | (const char *) ctx.extra); | ||
244 | GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map, | ||
245 | &sh, | ||
246 | &do_notify, | ||
247 | &ctx); | ||
224 | GNUNET_free (ctx.extra); | 248 | GNUNET_free (ctx.extra); |
225 | PQfreemem (n); | 249 | PQfreemem (n); |
226 | } | 250 | } |
@@ -238,9 +262,11 @@ do_scheduler_notify (void *cls) | |||
238 | { | 262 | { |
239 | struct GNUNET_PQ_Context *db = cls; | 263 | struct GNUNET_PQ_Context *db = cls; |
240 | 264 | ||
241 | GNUNET_assert (db->scheduler_on); | 265 | db->event_task = NULL; |
242 | GNUNET_assert (NULL != db->rfd); | 266 | GNUNET_assert (NULL != db->rfd); |
243 | GNUNET_PQ_event_do_poll (db); | 267 | event_do_poll (db); |
268 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
269 | "Resubscribing\n"); | ||
244 | db->event_task | 270 | db->event_task |
245 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, | 271 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL, |
246 | db->rfd, | 272 | db->rfd, |
@@ -262,6 +288,9 @@ scheduler_fd_cb (void *cls, | |||
262 | { | 288 | { |
263 | struct GNUNET_PQ_Context *db = cls; | 289 | struct GNUNET_PQ_Context *db = cls; |
264 | 290 | ||
291 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
292 | "New poll FD is %d\n", | ||
293 | fd); | ||
265 | if (NULL != db->event_task) | 294 | if (NULL != db->event_task) |
266 | { | 295 | { |
267 | GNUNET_SCHEDULER_cancel (db->event_task); | 296 | GNUNET_SCHEDULER_cancel (db->event_task); |
@@ -272,6 +301,9 @@ scheduler_fd_cb (void *cls, | |||
272 | return; | 301 | return; |
273 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) | 302 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) |
274 | return; | 303 | return; |
304 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
305 | "Activating poll job on %d\n", | ||
306 | fd); | ||
275 | db->rfd = GNUNET_NETWORK_socket_box_native (fd); | 307 | db->rfd = GNUNET_NETWORK_socket_box_native (fd); |
276 | db->event_task | 308 | db->event_task |
277 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO, | 309 | = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO, |
@@ -281,31 +313,6 @@ scheduler_fd_cb (void *cls, | |||
281 | } | 313 | } |
282 | 314 | ||
283 | 315 | ||
284 | void | ||
285 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db) | ||
286 | { | ||
287 | if (db->scheduler_on) | ||
288 | return; | ||
289 | db->scheduler_on = true; | ||
290 | scheduler_fd_cb (db, | ||
291 | PQsocket (db->conn)); | ||
292 | } | ||
293 | |||
294 | |||
295 | void | ||
296 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db) | ||
297 | { | ||
298 | GNUNET_assert (db->scheduler_on); | ||
299 | GNUNET_free (db->rfd); | ||
300 | db->scheduler_on = false; | ||
301 | if (NULL != db->event_task) | ||
302 | { | ||
303 | GNUNET_SCHEDULER_cancel (db->event_task); | ||
304 | db->event_task = NULL; | ||
305 | } | ||
306 | } | ||
307 | |||
308 | |||
309 | /** | 316 | /** |
310 | * Helper function to trigger an SQL @a cmd on @a db | 317 | * Helper function to trigger an SQL @a cmd on @a db |
311 | * | 318 | * |
@@ -376,8 +383,9 @@ void | |||
376 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, | 383 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, |
377 | int fd) | 384 | int fd) |
378 | { | 385 | { |
379 | if (! db->scheduler_on) | 386 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
380 | return; | 387 | "Change in PQ event FD to %d\n", |
388 | fd); | ||
381 | scheduler_fd_cb (db, | 389 | scheduler_fd_cb (db, |
382 | fd); | 390 | fd); |
383 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, | 391 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, |
@@ -412,7 +420,6 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | |||
412 | void *cb_cls) | 420 | void *cb_cls) |
413 | { | 421 | { |
414 | struct GNUNET_DB_EventHandler *eh; | 422 | struct GNUNET_DB_EventHandler *eh; |
415 | bool was_zero; | ||
416 | 423 | ||
417 | eh = GNUNET_new (struct GNUNET_DB_EventHandler); | 424 | eh = GNUNET_new (struct GNUNET_DB_EventHandler); |
418 | eh->db = db; | 425 | eh->db = db; |
@@ -420,14 +427,18 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | |||
420 | &eh->sh); | 427 | &eh->sh); |
421 | eh->cb = cb; | 428 | eh->cb = cb; |
422 | eh->cb_cls = cb_cls; | 429 | eh->cb_cls = cb_cls; |
423 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); | ||
424 | GNUNET_assert (GNUNET_OK == | 430 | GNUNET_assert (GNUNET_OK == |
425 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, | 431 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, |
426 | &eh->sh, | 432 | &eh->sh, |
427 | eh, | 433 | eh, |
428 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 434 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
429 | if (was_zero) | 435 | if (NULL == db->event_task) |
430 | GNUNET_PQ_event_scheduler_start_ (db); | 436 | { |
437 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
438 | "Starting event scheduler\n"); | ||
439 | scheduler_fd_cb (db, | ||
440 | PQsocket (db->conn)); | ||
441 | } | ||
431 | manage_subscribe (db, | 442 | manage_subscribe (db, |
432 | "LISTEN X", | 443 | "LISTEN X", |
433 | eh); | 444 | eh); |
@@ -447,13 +458,19 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) | |||
447 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, | 458 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, |
448 | &eh->sh, | 459 | &eh->sh, |
449 | eh)); | 460 | eh)); |
450 | |||
451 | manage_subscribe (db, | 461 | manage_subscribe (db, |
452 | "UNLISTEN X", | 462 | "UNLISTEN X", |
453 | eh); | 463 | eh); |
454 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) | 464 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) |
455 | { | 465 | { |
456 | GNUNET_PQ_event_scheduler_stop_ (db); | 466 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, |
467 | "Stopping PQ event scheduler job\n"); | ||
468 | GNUNET_free (db->rfd); | ||
469 | if (NULL != db->event_task) | ||
470 | { | ||
471 | GNUNET_SCHEDULER_cancel (db->event_task); | ||
472 | db->event_task = NULL; | ||
473 | } | ||
457 | } | 474 | } |
458 | if (NULL != eh->timeout_task) | 475 | if (NULL != eh->timeout_task) |
459 | { | 476 | { |
@@ -488,6 +505,9 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, | |||
488 | *end = '\0'; | 505 | *end = '\0'; |
489 | end = stpcpy (end, | 506 | end = stpcpy (end, |
490 | "'"); | 507 | "'"); |
508 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
509 | "Executing command `%s'\n", | ||
510 | sql); | ||
491 | result = PQexec (db->conn, | 511 | result = PQexec (db->conn, |
492 | sql); | 512 | sql); |
493 | if (PGRES_COMMAND_OK != PQresultStatus (result)) | 513 | if (PGRES_COMMAND_OK != PQresultStatus (result)) |
@@ -505,6 +525,7 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, | |||
505 | PQerrorMessage (db->conn)); | 525 | PQerrorMessage (db->conn)); |
506 | } | 526 | } |
507 | PQclear (result); | 527 | PQclear (result); |
528 | event_do_poll (db); | ||
508 | } | 529 | } |
509 | 530 | ||
510 | 531 | ||