diff options
Diffstat (limited to 'src/pq')
-rw-r--r-- | src/pq/pq.h | 40 | ||||
-rw-r--r-- | src/pq/pq_connect.c | 17 | ||||
-rw-r--r-- | src/pq/pq_event.c | 145 | ||||
-rw-r--r-- | src/pq/test_pq.c | 64 |
4 files changed, 96 insertions, 170 deletions
diff --git a/src/pq/pq.h b/src/pq/pq.h index 107fd116c..950d38220 100644 --- a/src/pq/pq.h +++ b/src/pq/pq.h | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
29 | #include "gnunet_pq_lib.h" | 29 | #include "gnunet_pq_lib.h" |
30 | 30 | ||
31 | |||
31 | /** | 32 | /** |
32 | * Handle to Postgres database. | 33 | * Handle to Postgres database. |
33 | */ | 34 | */ |
@@ -59,26 +60,11 @@ struct GNUNET_PQ_Context | |||
59 | char *load_path; | 60 | char *load_path; |
60 | 61 | ||
61 | /** | 62 | /** |
62 | * Function to call on Postgres FDs. | ||
63 | */ | ||
64 | GNUNET_PQ_SocketCallback sc; | ||
65 | |||
66 | /** | ||
67 | * Closure for @e sc. | ||
68 | */ | ||
69 | void *sc_cls; | ||
70 | |||
71 | /** | ||
72 | * Map managing event subscriptions. | 63 | * Map managing event subscriptions. |
73 | */ | 64 | */ |
74 | struct GNUNET_CONTAINER_MultiShortmap *channel_map; | 65 | struct GNUNET_CONTAINER_MultiShortmap *channel_map; |
75 | 66 | ||
76 | /** | 67 | /** |
77 | * Lock to access @e channel_map. | ||
78 | */ | ||
79 | pthread_mutex_t notify_lock; | ||
80 | |||
81 | /** | ||
82 | * Task responsible for processing events. | 68 | * Task responsible for processing events. |
83 | */ | 69 | */ |
84 | struct GNUNET_SCHEDULER_Task *event_task; | 70 | struct GNUNET_SCHEDULER_Task *event_task; |
@@ -87,7 +73,7 @@ struct GNUNET_PQ_Context | |||
87 | * File descriptor wrapper for @e event_task. | 73 | * File descriptor wrapper for @e event_task. |
88 | */ | 74 | */ |
89 | struct GNUNET_NETWORK_Handle *rfd; | 75 | struct GNUNET_NETWORK_Handle *rfd; |
90 | 76 | ||
91 | /** | 77 | /** |
92 | * Is scheduling via the GNUnet scheduler desired? | 78 | * Is scheduling via the GNUnet scheduler desired? |
93 | */ | 79 | */ |
@@ -100,9 +86,29 @@ struct GNUNET_PQ_Context | |||
100 | * after a disconnect. | 86 | * after a disconnect. |
101 | * | 87 | * |
102 | * @param db the DB handle | 88 | * @param db the DB handle |
89 | * @param fd socket to listen on | ||
90 | */ | ||
91 | void | ||
92 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, | ||
93 | int fd); | ||
94 | |||
95 | |||
96 | /** | ||
97 | * Run poll event loop using the GNUnet scheduler. | ||
98 | * | ||
99 | * @param db database handle | ||
100 | */ | ||
101 | void | ||
102 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db); | ||
103 | |||
104 | |||
105 | /** | ||
106 | * Stop running poll event loop using the GNUnet scheduler. | ||
107 | * | ||
108 | * @param db database handle | ||
103 | */ | 109 | */ |
104 | void | 110 | void |
105 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db); | 111 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db); |
106 | 112 | ||
107 | 113 | ||
108 | #endif | 114 | #endif |
diff --git a/src/pq/pq_connect.c b/src/pq/pq_connect.c index 275fd7450..05e787939 100644 --- a/src/pq/pq_connect.c +++ b/src/pq/pq_connect.c | |||
@@ -103,9 +103,6 @@ GNUNET_PQ_connect (const char *config_str, | |||
103 | } | 103 | } |
104 | db->channel_map = GNUNET_CONTAINER_multishortmap_create (16, | 104 | db->channel_map = GNUNET_CONTAINER_multishortmap_create (16, |
105 | GNUNET_YES); | 105 | GNUNET_YES); |
106 | GNUNET_assert (0 == | ||
107 | pthread_mutex_init (&db->notify_lock, | ||
108 | NULL)); | ||
109 | GNUNET_PQ_reconnect (db); | 106 | GNUNET_PQ_reconnect (db); |
110 | if (NULL == db->conn) | 107 | if (NULL == db->conn) |
111 | { | 108 | { |
@@ -294,9 +291,8 @@ GNUNET_PQ_reconnect_if_down (struct GNUNET_PQ_Context *db) | |||
294 | void | 291 | void |
295 | GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) | 292 | GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) |
296 | { | 293 | { |
297 | if (NULL != db->sc) | 294 | GNUNET_PQ_event_reconnect_ (db, |
298 | db->sc (db->sc_cls, | 295 | -1); |
299 | -1); | ||
300 | if (NULL != db->conn) | 296 | if (NULL != db->conn) |
301 | PQfinish (db->conn); | 297 | PQfinish (db->conn); |
302 | db->conn = PQconnectdb (db->config_str); | 298 | db->conn = PQconnectdb (db->config_str); |
@@ -416,11 +412,8 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) | |||
416 | db->conn = NULL; | 412 | db->conn = NULL; |
417 | return; | 413 | return; |
418 | } | 414 | } |
419 | GNUNET_PQ_event_reconnect_ (db); | 415 | GNUNET_PQ_event_reconnect_ (db, |
420 | if ( (NULL != db->sc) && | 416 | PQsocket (db->conn)); |
421 | (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | ||
422 | db->sc (db->sc_cls, | ||
423 | PQsocket (db->conn)); | ||
424 | } | 417 | } |
425 | 418 | ||
426 | 419 | ||
@@ -473,8 +466,6 @@ GNUNET_PQ_disconnect (struct GNUNET_PQ_Context *db) | |||
473 | GNUNET_assert (0 == | 466 | GNUNET_assert (0 == |
474 | GNUNET_CONTAINER_multishortmap_size (db->channel_map)); | 467 | GNUNET_CONTAINER_multishortmap_size (db->channel_map)); |
475 | GNUNET_CONTAINER_multishortmap_destroy (db->channel_map); | 468 | GNUNET_CONTAINER_multishortmap_destroy (db->channel_map); |
476 | GNUNET_assert (0 == | ||
477 | pthread_mutex_destroy (&db->notify_lock)); | ||
478 | GNUNET_free (db->es); | 469 | GNUNET_free (db->es); |
479 | GNUNET_free (db->ps); | 470 | GNUNET_free (db->ps); |
480 | GNUNET_free (db->load_path); | 471 | GNUNET_free (db->load_path); |
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c index 2890869a3..3a0bfcde3 100644 --- a/src/pq/pq_event.c +++ b/src/pq/pq_event.c | |||
@@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler | |||
51 | * Database context this event handler is with. | 51 | * Database context this event handler is with. |
52 | */ | 52 | */ |
53 | struct GNUNET_PQ_Context *db; | 53 | struct GNUNET_PQ_Context *db; |
54 | |||
55 | /** | ||
56 | * Task to run on timeout. | ||
57 | */ | ||
58 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
54 | }; | 59 | }; |
55 | 60 | ||
56 | 61 | ||
@@ -163,35 +168,10 @@ do_notify (void *cls, | |||
163 | 168 | ||
164 | 169 | ||
165 | void | 170 | void |
166 | GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db, | ||
167 | GNUNET_PQ_SocketCallback sc, | ||
168 | void *sc_cls) | ||
169 | { | ||
170 | int fd; | ||
171 | |||
172 | db->sc = sc; | ||
173 | db->sc_cls = sc_cls; | ||
174 | if (NULL == sc) | ||
175 | return; | ||
176 | GNUNET_assert (0 == | ||
177 | pthread_mutex_lock (&db->notify_lock)); | ||
178 | fd = PQsocket (db->conn); | ||
179 | if ( (-1 != fd) && | ||
180 | (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | ||
181 | sc (sc_cls, | ||
182 | fd); | ||
183 | GNUNET_assert (0 == | ||
184 | pthread_mutex_unlock (&db->notify_lock)); | ||
185 | } | ||
186 | |||
187 | |||
188 | void | ||
189 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | 171 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) |
190 | { | 172 | { |
191 | PGnotify *n; | 173 | PGnotify *n; |
192 | 174 | ||
193 | GNUNET_assert (0 == | ||
194 | pthread_mutex_lock (&db->notify_lock)); | ||
195 | if (1 != | 175 | if (1 != |
196 | PQconsumeInput (db->conn)) | 176 | PQconsumeInput (db->conn)) |
197 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 177 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
204 | .extra = NULL | 184 | .extra = NULL |
205 | }; | 185 | }; |
206 | 186 | ||
187 | if ('X' != toupper ((int) n->relname[0])) | ||
188 | { | ||
189 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
190 | "Ignoring notification for unsupported channel identifier `%s'\n", | ||
191 | n->relname); | ||
192 | PQfreemem (n); | ||
193 | continue; | ||
194 | } | ||
207 | if (GNUNET_OK != | 195 | if (GNUNET_OK != |
208 | GNUNET_STRINGS_string_to_data (n->relname, | 196 | GNUNET_STRINGS_string_to_data (&n->relname[1], |
209 | strlen (n->relname), | 197 | strlen (&n->relname[1]), |
210 | &sh, | 198 | &sh, |
211 | sizeof (sh))) | 199 | sizeof (sh))) |
212 | { | 200 | { |
@@ -236,24 +224,10 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
236 | GNUNET_free (ctx.extra); | 224 | GNUNET_free (ctx.extra); |
237 | PQfreemem (n); | 225 | PQfreemem (n); |
238 | } | 226 | } |
239 | GNUNET_assert (0 == | ||
240 | pthread_mutex_unlock (&db->notify_lock)); | ||
241 | } | 227 | } |
242 | 228 | ||
243 | 229 | ||
244 | /** | 230 | /** |
245 | * Function called when the Postgres FD changes and we need | ||
246 | * to update the scheduler event loop task. | ||
247 | * | ||
248 | * @param cls a `struct GNUNET_PQ_Context *` | ||
249 | * @param fd the file descriptor, possibly -1 | ||
250 | */ | ||
251 | static void | ||
252 | scheduler_fd_cb (void *cls, | ||
253 | int fd); | ||
254 | |||
255 | |||
256 | /** | ||
257 | * The GNUnet scheduler notifies us that we need to | 231 | * The GNUnet scheduler notifies us that we need to |
258 | * trigger the DB event poller. | 232 | * trigger the DB event poller. |
259 | * | 233 | * |
@@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls, | |||
308 | 282 | ||
309 | 283 | ||
310 | void | 284 | void |
311 | GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db) | 285 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db) |
312 | { | 286 | { |
313 | int fd; | 287 | if (db->scheduler_on) |
314 | 288 | return; | |
315 | GNUNET_assert (! db->scheduler_on); | ||
316 | GNUNET_assert (NULL == db->sc); | ||
317 | db->scheduler_on = true; | 289 | db->scheduler_on = true; |
318 | db->sc = &scheduler_fd_cb; | ||
319 | db->sc_cls = db; | ||
320 | fd = PQsocket (db->conn); | ||
321 | scheduler_fd_cb (db, | 290 | scheduler_fd_cb (db, |
322 | fd); | 291 | PQsocket (db->conn)); |
323 | } | 292 | } |
324 | 293 | ||
325 | 294 | ||
326 | void | 295 | void |
327 | GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) | 296 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db) |
328 | { | 297 | { |
329 | GNUNET_assert (db->scheduler_on); | 298 | GNUNET_assert (db->scheduler_on); |
330 | GNUNET_free (db->rfd); | 299 | GNUNET_free (db->rfd); |
331 | db->sc = NULL; | ||
332 | db->scheduler_on = false; | 300 | db->scheduler_on = false; |
333 | if (NULL != db->event_task) | 301 | if (NULL != db->event_task) |
334 | { | 302 | { |
@@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) | |||
338 | } | 306 | } |
339 | 307 | ||
340 | 308 | ||
309 | /** | ||
310 | * Helper function to trigger an SQL @a cmd on @a db | ||
311 | * | ||
312 | * @param db database to send command to | ||
313 | * @param cmd prefix of the command to send | ||
314 | * @param eh details about the event | ||
315 | */ | ||
341 | static void | 316 | static void |
342 | manage_subscribe (struct GNUNET_PQ_Context *db, | 317 | manage_subscribe (struct GNUNET_PQ_Context *db, |
343 | const char *cmd, | 318 | const char *cmd, |
@@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db, | |||
351 | cmd); | 326 | cmd); |
352 | end = sh_to_channel (&eh->sh, | 327 | end = sh_to_channel (&eh->sh, |
353 | end); | 328 | end); |
329 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
330 | "Executing PQ command `%s'\n", | ||
331 | sql); | ||
354 | result = PQexec (db->conn, | 332 | result = PQexec (db->conn, |
355 | sql); | 333 | sql); |
356 | if (PGRES_COMMAND_OK != PQresultStatus (result)) | 334 | if (PGRES_COMMAND_OK != PQresultStatus (result)) |
@@ -395,21 +373,41 @@ register_notify (void *cls, | |||
395 | 373 | ||
396 | 374 | ||
397 | void | 375 | void |
398 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db) | 376 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, |
377 | int fd) | ||
399 | { | 378 | { |
400 | GNUNET_assert (0 == | 379 | if (! db->scheduler_on) |
401 | pthread_mutex_lock (&db->notify_lock)); | 380 | return; |
381 | scheduler_fd_cb (db, | ||
382 | fd); | ||
402 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, | 383 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, |
403 | ®ister_notify, | 384 | ®ister_notify, |
404 | db); | 385 | db); |
405 | GNUNET_assert (0 == | 386 | } |
406 | pthread_mutex_unlock (&db->notify_lock)); | 387 | |
388 | |||
389 | /** | ||
390 | * Function run on timeout for an event. Triggers | ||
391 | * the notification, but does NOT clear the handler. | ||
392 | * | ||
393 | * @param cls a `struct GNUNET_DB_EventHandler *` | ||
394 | */ | ||
395 | static void | ||
396 | event_timeout (void *cls) | ||
397 | { | ||
398 | struct GNUNET_DB_EventHandler *eh = cls; | ||
399 | |||
400 | eh->timeout_task = NULL; | ||
401 | eh->cb (eh->cb_cls, | ||
402 | NULL, | ||
403 | 0); | ||
407 | } | 404 | } |
408 | 405 | ||
409 | 406 | ||
410 | struct GNUNET_DB_EventHandler * | 407 | struct GNUNET_DB_EventHandler * |
411 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | 408 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, |
412 | const struct GNUNET_DB_EventHeaderP *es, | 409 | const struct GNUNET_DB_EventHeaderP *es, |
410 | struct GNUNET_TIME_Relative timeout, | ||
413 | GNUNET_DB_EventCallback cb, | 411 | GNUNET_DB_EventCallback cb, |
414 | void *cb_cls) | 412 | void *cb_cls) |
415 | { | 413 | { |
@@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | |||
422 | &eh->sh); | 420 | &eh->sh); |
423 | eh->cb = cb; | 421 | eh->cb = cb; |
424 | eh->cb_cls = cb_cls; | 422 | eh->cb_cls = cb_cls; |
425 | GNUNET_assert (0 == | ||
426 | pthread_mutex_lock (&db->notify_lock)); | ||
427 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); | 423 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); |
428 | GNUNET_assert (GNUNET_OK == | 424 | GNUNET_assert (GNUNET_OK == |
429 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, | 425 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, |
430 | &eh->sh, | 426 | &eh->sh, |
431 | eh, | 427 | eh, |
432 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 428 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
433 | if ( (NULL != db->sc) && | 429 | if (was_zero) |
434 | was_zero) | 430 | GNUNET_PQ_event_scheduler_start_ (db); |
435 | { | ||
436 | int fd = PQsocket (db->conn); | ||
437 | |||
438 | if (-1 != fd) | ||
439 | db->sc (db->sc_cls, | ||
440 | fd); | ||
441 | } | ||
442 | manage_subscribe (db, | 431 | manage_subscribe (db, |
443 | "LISTEN X", | 432 | "LISTEN X", |
444 | eh); | 433 | eh); |
445 | GNUNET_assert (0 == | 434 | eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, |
446 | pthread_mutex_unlock (&db->notify_lock)); | 435 | &event_timeout, |
436 | eh); | ||
447 | return eh; | 437 | return eh; |
448 | } | 438 | } |
449 | 439 | ||
@@ -453,8 +443,6 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) | |||
453 | { | 443 | { |
454 | struct GNUNET_PQ_Context *db = eh->db; | 444 | struct GNUNET_PQ_Context *db = eh->db; |
455 | 445 | ||
456 | GNUNET_assert (0 == | ||
457 | pthread_mutex_lock (&db->notify_lock)); | ||
458 | GNUNET_assert (GNUNET_OK == | 446 | GNUNET_assert (GNUNET_OK == |
459 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, | 447 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, |
460 | &eh->sh, | 448 | &eh->sh, |
@@ -463,14 +451,15 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) | |||
463 | manage_subscribe (db, | 451 | manage_subscribe (db, |
464 | "UNLISTEN X", | 452 | "UNLISTEN X", |
465 | eh); | 453 | eh); |
466 | if ( (NULL != db->sc) && | 454 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) |
467 | (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | 455 | { |
456 | GNUNET_PQ_event_scheduler_stop_ (db); | ||
457 | } | ||
458 | if (NULL != eh->timeout_task) | ||
468 | { | 459 | { |
469 | db->sc (db->sc_cls, | 460 | GNUNET_SCHEDULER_cancel (eh->timeout_task); |
470 | -1); | 461 | eh->timeout_task = NULL; |
471 | } | 462 | } |
472 | GNUNET_assert (0 == | ||
473 | pthread_mutex_unlock (&db->notify_lock)); | ||
474 | GNUNET_free (eh); | 463 | GNUNET_free (eh); |
475 | } | 464 | } |
476 | 465 | ||
diff --git a/src/pq/test_pq.c b/src/pq/test_pq.c index ffbb4d129..90b5c6489 100644 --- a/src/pq/test_pq.c +++ b/src/pq/test_pq.c | |||
@@ -240,63 +240,6 @@ run_queries (struct GNUNET_PQ_Context *db) | |||
240 | } | 240 | } |
241 | 241 | ||
242 | 242 | ||
243 | static void | ||
244 | event_cb (void *cls, | ||
245 | const void *extra, | ||
246 | size_t extra_size) | ||
247 | { | ||
248 | unsigned int *cnt = cls; | ||
249 | |||
250 | GNUNET_assert (5 == extra_size); | ||
251 | GNUNET_assert (0 == memcmp ("world", | ||
252 | extra, | ||
253 | 5)); | ||
254 | (*cnt)++; | ||
255 | } | ||
256 | |||
257 | |||
258 | /** | ||
259 | * Run subscribe/notify tests. | ||
260 | * | ||
261 | * @param db database handle | ||
262 | * @return 0 on success | ||
263 | */ | ||
264 | static int | ||
265 | test_notify (struct GNUNET_PQ_Context *db) | ||
266 | { | ||
267 | struct GNUNET_DB_EventHeaderP e1 = { | ||
268 | .size = htons (sizeof (e1)), | ||
269 | .type = htons (1) | ||
270 | }; | ||
271 | struct GNUNET_DB_EventHeaderP e2 = { | ||
272 | .size = htons (sizeof (e2)), | ||
273 | .type = htons (2) | ||
274 | }; | ||
275 | unsigned int called = 0; | ||
276 | struct GNUNET_DB_EventHandler *eh; | ||
277 | |||
278 | eh = GNUNET_PQ_event_listen (db, | ||
279 | &e1, | ||
280 | &event_cb, | ||
281 | &called); | ||
282 | GNUNET_assert (NULL != eh); | ||
283 | GNUNET_PQ_event_notify (db, | ||
284 | &e2, | ||
285 | "hello", | ||
286 | 5); | ||
287 | GNUNET_PQ_event_do_poll (db); | ||
288 | GNUNET_assert (0 == called); | ||
289 | GNUNET_PQ_event_notify (db, | ||
290 | &e1, | ||
291 | "world", | ||
292 | 5); | ||
293 | GNUNET_PQ_event_do_poll (db); | ||
294 | GNUNET_assert (1 == called); | ||
295 | GNUNET_PQ_event_listen_cancel (eh); | ||
296 | return 0; | ||
297 | } | ||
298 | |||
299 | |||
300 | /** | 243 | /** |
301 | * Task called on shutdown. | 244 | * Task called on shutdown. |
302 | * | 245 | * |
@@ -305,7 +248,6 @@ test_notify (struct GNUNET_PQ_Context *db) | |||
305 | static void | 248 | static void |
306 | event_end (void *cls) | 249 | event_end (void *cls) |
307 | { | 250 | { |
308 | GNUNET_PQ_event_scheduler_stop (db); | ||
309 | GNUNET_PQ_event_listen_cancel (eh); | 251 | GNUNET_PQ_event_listen_cancel (eh); |
310 | eh = NULL; | 252 | eh = NULL; |
311 | if (NULL != tt) | 253 | if (NULL != tt) |
@@ -368,9 +310,9 @@ sched_tests (void *cls) | |||
368 | tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | 310 | tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, |
369 | &timeout_cb, | 311 | &timeout_cb, |
370 | NULL); | 312 | NULL); |
371 | GNUNET_PQ_event_scheduler_start (db); | ||
372 | eh = GNUNET_PQ_event_listen (db, | 313 | eh = GNUNET_PQ_event_listen (db, |
373 | &es, | 314 | &es, |
315 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
374 | &event_sched_cb, | 316 | &event_sched_cb, |
375 | NULL); | 317 | NULL); |
376 | GNUNET_PQ_reconnect (db); | 318 | GNUNET_PQ_reconnect (db); |
@@ -404,7 +346,7 @@ main (int argc, | |||
404 | }; | 346 | }; |
405 | 347 | ||
406 | GNUNET_log_setup ("test-pq", | 348 | GNUNET_log_setup ("test-pq", |
407 | "WARNING", | 349 | "INFO", |
408 | NULL); | 350 | NULL); |
409 | db = GNUNET_PQ_connect ("postgres:///gnunetcheck", | 351 | db = GNUNET_PQ_connect ("postgres:///gnunetcheck", |
410 | NULL, | 352 | NULL, |
@@ -433,8 +375,6 @@ main (int argc, | |||
433 | return 1; | 375 | return 1; |
434 | } | 376 | } |
435 | ret = run_queries (db); | 377 | ret = run_queries (db); |
436 | ret |= test_notify (db); | ||
437 | ret |= test_notify (db); | ||
438 | if (0 != ret) | 378 | if (0 != ret) |
439 | { | 379 | { |
440 | GNUNET_break (0); | 380 | GNUNET_break (0); |