diff options
-rw-r--r-- | src/include/gnunet_pq_lib.h | 49 | ||||
-rw-r--r-- | src/pq/Makefile.am | 1 | ||||
-rw-r--r-- | src/pq/pq.h | 40 | ||||
-rw-r--r-- | src/pq/pq_connect.c | 17 | ||||
-rw-r--r-- | src/pq/pq_event.c | 153 | ||||
-rw-r--r-- | src/pq/test_pq.c | 64 | ||||
-rw-r--r-- | src/util/network.c | 28 | ||||
-rw-r--r-- | src/util/os_priority.c | 4 |
8 files changed, 118 insertions, 238 deletions
diff --git a/src/include/gnunet_pq_lib.h b/src/include/gnunet_pq_lib.h index ecc2b9719..ff4498938 100644 --- a/src/include/gnunet_pq_lib.h +++ b/src/include/gnunet_pq_lib.h | |||
@@ -853,33 +853,6 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db); | |||
853 | 853 | ||
854 | 854 | ||
855 | /** | 855 | /** |
856 | * Function called whenever the socket needed for | ||
857 | * notifications from postgres changes. | ||
858 | * | ||
859 | * @param cls closure | ||
860 | * @param fd socket to listen on, -1 for none | ||
861 | */ | ||
862 | typedef void | ||
863 | (*GNUNET_PQ_SocketCallback)(void *cls, | ||
864 | int fd); | ||
865 | |||
866 | |||
867 | /** | ||
868 | * Obtain the file descriptor to poll on for notifications. | ||
869 | * Useful if the GNUnet scheduler is NOT to be used for | ||
870 | * such notifications. | ||
871 | * | ||
872 | * @param db database handle | ||
873 | * @param sc function to call with the socket | ||
874 | * @param sc_cls closure for @a sc | ||
875 | */ | ||
876 | void | ||
877 | GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db, | ||
878 | GNUNET_PQ_SocketCallback sc, | ||
879 | void *sc_cls); | ||
880 | |||
881 | |||
882 | /** | ||
883 | * Poll for database events now. Used if the event FD | 856 | * Poll for database events now. Used if the event FD |
884 | * is ready and the application wants to trigger applicable | 857 | * is ready and the application wants to trigger applicable |
885 | * events. | 858 | * events. |
@@ -893,24 +866,6 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db); | |||
893 | 866 | ||
894 | 867 | ||
895 | /** | 868 | /** |
896 | * Run poll event loop using the GNUnet scheduler. | ||
897 | * | ||
898 | * @param db database handle | ||
899 | */ | ||
900 | void | ||
901 | GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db); | ||
902 | |||
903 | |||
904 | /** | ||
905 | * Stop running poll event loop using the GNUnet scheduler. | ||
906 | * | ||
907 | * @param db database handle | ||
908 | */ | ||
909 | void | ||
910 | GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db); | ||
911 | |||
912 | |||
913 | /** | ||
914 | * Register callback to be invoked on events of type @a es. | 869 | * Register callback to be invoked on events of type @a es. |
915 | * | 870 | * |
916 | * Unlike many other calls, this function is thread-safe | 871 | * Unlike many other calls, this function is thread-safe |
@@ -921,14 +876,16 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db); | |||
921 | * | 876 | * |
922 | * @param db database context to use | 877 | * @param db database context to use |
923 | * @param es specification of the event to listen for | 878 | * @param es specification of the event to listen for |
879 | * @param timeout when to trigger @a cb based on timeout | ||
924 | * @param cb function to call when the event happens, possibly | 880 | * @param cb function to call when the event happens, possibly |
925 | * multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked) | 881 | * multiple times (until #GNUNET_PQ_event_listen_cancel() is invoked), including on timeout |
926 | * @param cb_cls closure for @a cb | 882 | * @param cb_cls closure for @a cb |
927 | * @return handle useful to cancel the listener | 883 | * @return handle useful to cancel the listener |
928 | */ | 884 | */ |
929 | struct GNUNET_DB_EventHandler * | 885 | struct GNUNET_DB_EventHandler * |
930 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | 886 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, |
931 | const struct GNUNET_DB_EventHeaderP *es, | 887 | const struct GNUNET_DB_EventHeaderP *es, |
888 | struct GNUNET_TIME_Relative timeout, | ||
932 | GNUNET_DB_EventCallback cb, | 889 | GNUNET_DB_EventCallback cb, |
933 | void *cb_cls); | 890 | void *cb_cls); |
934 | 891 | ||
diff --git a/src/pq/Makefile.am b/src/pq/Makefile.am index 0febac4ac..cbb123cbb 100644 --- a/src/pq/Makefile.am +++ b/src/pq/Makefile.am | |||
@@ -24,7 +24,6 @@ libgnunetpq_la_LIBADD = -lpq \ | |||
24 | libgnunetpq_la_LDFLAGS = \ | 24 | libgnunetpq_la_LDFLAGS = \ |
25 | $(POSTGRESQL_LDFLAGS) \ | 25 | $(POSTGRESQL_LDFLAGS) \ |
26 | $(GN_LIB_LDFLAGS) \ | 26 | $(GN_LIB_LDFLAGS) \ |
27 | -lpthread \ | ||
28 | -version-info 1:0:0 | 27 | -version-info 1:0:0 |
29 | 28 | ||
30 | if ENABLE_TEST_RUN | 29 | if ENABLE_TEST_RUN |
diff --git a/src/pq/pq.h b/src/pq/pq.h index 107fd116c..950d38220 100644 --- a/src/pq/pq.h +++ b/src/pq/pq.h | |||
@@ -28,6 +28,7 @@ | |||
28 | #include "gnunet_util_lib.h" | 28 | #include "gnunet_util_lib.h" |
29 | #include "gnunet_pq_lib.h" | 29 | #include "gnunet_pq_lib.h" |
30 | 30 | ||
31 | |||
31 | /** | 32 | /** |
32 | * Handle to Postgres database. | 33 | * Handle to Postgres database. |
33 | */ | 34 | */ |
@@ -59,26 +60,11 @@ struct GNUNET_PQ_Context | |||
59 | char *load_path; | 60 | char *load_path; |
60 | 61 | ||
61 | /** | 62 | /** |
62 | * Function to call on Postgres FDs. | ||
63 | */ | ||
64 | GNUNET_PQ_SocketCallback sc; | ||
65 | |||
66 | /** | ||
67 | * Closure for @e sc. | ||
68 | */ | ||
69 | void *sc_cls; | ||
70 | |||
71 | /** | ||
72 | * Map managing event subscriptions. | 63 | * Map managing event subscriptions. |
73 | */ | 64 | */ |
74 | struct GNUNET_CONTAINER_MultiShortmap *channel_map; | 65 | struct GNUNET_CONTAINER_MultiShortmap *channel_map; |
75 | 66 | ||
76 | /** | 67 | /** |
77 | * Lock to access @e channel_map. | ||
78 | */ | ||
79 | pthread_mutex_t notify_lock; | ||
80 | |||
81 | /** | ||
82 | * Task responsible for processing events. | 68 | * Task responsible for processing events. |
83 | */ | 69 | */ |
84 | struct GNUNET_SCHEDULER_Task *event_task; | 70 | struct GNUNET_SCHEDULER_Task *event_task; |
@@ -87,7 +73,7 @@ struct GNUNET_PQ_Context | |||
87 | * File descriptor wrapper for @e event_task. | 73 | * File descriptor wrapper for @e event_task. |
88 | */ | 74 | */ |
89 | struct GNUNET_NETWORK_Handle *rfd; | 75 | struct GNUNET_NETWORK_Handle *rfd; |
90 | 76 | ||
91 | /** | 77 | /** |
92 | * Is scheduling via the GNUnet scheduler desired? | 78 | * Is scheduling via the GNUnet scheduler desired? |
93 | */ | 79 | */ |
@@ -100,9 +86,29 @@ struct GNUNET_PQ_Context | |||
100 | * after a disconnect. | 86 | * after a disconnect. |
101 | * | 87 | * |
102 | * @param db the DB handle | 88 | * @param db the DB handle |
89 | * @param fd socket to listen on | ||
90 | */ | ||
91 | void | ||
92 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, | ||
93 | int fd); | ||
94 | |||
95 | |||
96 | /** | ||
97 | * Run poll event loop using the GNUnet scheduler. | ||
98 | * | ||
99 | * @param db database handle | ||
100 | */ | ||
101 | void | ||
102 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db); | ||
103 | |||
104 | |||
105 | /** | ||
106 | * Stop running poll event loop using the GNUnet scheduler. | ||
107 | * | ||
108 | * @param db database handle | ||
103 | */ | 109 | */ |
104 | void | 110 | void |
105 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db); | 111 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db); |
106 | 112 | ||
107 | 113 | ||
108 | #endif | 114 | #endif |
diff --git a/src/pq/pq_connect.c b/src/pq/pq_connect.c index 275fd7450..05e787939 100644 --- a/src/pq/pq_connect.c +++ b/src/pq/pq_connect.c | |||
@@ -103,9 +103,6 @@ GNUNET_PQ_connect (const char *config_str, | |||
103 | } | 103 | } |
104 | db->channel_map = GNUNET_CONTAINER_multishortmap_create (16, | 104 | db->channel_map = GNUNET_CONTAINER_multishortmap_create (16, |
105 | GNUNET_YES); | 105 | GNUNET_YES); |
106 | GNUNET_assert (0 == | ||
107 | pthread_mutex_init (&db->notify_lock, | ||
108 | NULL)); | ||
109 | GNUNET_PQ_reconnect (db); | 106 | GNUNET_PQ_reconnect (db); |
110 | if (NULL == db->conn) | 107 | if (NULL == db->conn) |
111 | { | 108 | { |
@@ -294,9 +291,8 @@ GNUNET_PQ_reconnect_if_down (struct GNUNET_PQ_Context *db) | |||
294 | void | 291 | void |
295 | GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) | 292 | GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) |
296 | { | 293 | { |
297 | if (NULL != db->sc) | 294 | GNUNET_PQ_event_reconnect_ (db, |
298 | db->sc (db->sc_cls, | 295 | -1); |
299 | -1); | ||
300 | if (NULL != db->conn) | 296 | if (NULL != db->conn) |
301 | PQfinish (db->conn); | 297 | PQfinish (db->conn); |
302 | db->conn = PQconnectdb (db->config_str); | 298 | db->conn = PQconnectdb (db->config_str); |
@@ -416,11 +412,8 @@ GNUNET_PQ_reconnect (struct GNUNET_PQ_Context *db) | |||
416 | db->conn = NULL; | 412 | db->conn = NULL; |
417 | return; | 413 | return; |
418 | } | 414 | } |
419 | GNUNET_PQ_event_reconnect_ (db); | 415 | GNUNET_PQ_event_reconnect_ (db, |
420 | if ( (NULL != db->sc) && | 416 | PQsocket (db->conn)); |
421 | (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | ||
422 | db->sc (db->sc_cls, | ||
423 | PQsocket (db->conn)); | ||
424 | } | 417 | } |
425 | 418 | ||
426 | 419 | ||
@@ -473,8 +466,6 @@ GNUNET_PQ_disconnect (struct GNUNET_PQ_Context *db) | |||
473 | GNUNET_assert (0 == | 466 | GNUNET_assert (0 == |
474 | GNUNET_CONTAINER_multishortmap_size (db->channel_map)); | 467 | GNUNET_CONTAINER_multishortmap_size (db->channel_map)); |
475 | GNUNET_CONTAINER_multishortmap_destroy (db->channel_map); | 468 | GNUNET_CONTAINER_multishortmap_destroy (db->channel_map); |
476 | GNUNET_assert (0 == | ||
477 | pthread_mutex_destroy (&db->notify_lock)); | ||
478 | GNUNET_free (db->es); | 469 | GNUNET_free (db->es); |
479 | GNUNET_free (db->ps); | 470 | GNUNET_free (db->ps); |
480 | GNUNET_free (db->load_path); | 471 | GNUNET_free (db->load_path); |
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c index e6c2d07fd..3a0bfcde3 100644 --- a/src/pq/pq_event.c +++ b/src/pq/pq_event.c | |||
@@ -51,6 +51,11 @@ struct GNUNET_DB_EventHandler | |||
51 | * Database context this event handler is with. | 51 | * Database context this event handler is with. |
52 | */ | 52 | */ |
53 | struct GNUNET_PQ_Context *db; | 53 | struct GNUNET_PQ_Context *db; |
54 | |||
55 | /** | ||
56 | * Task to run on timeout. | ||
57 | */ | ||
58 | struct GNUNET_SCHEDULER_Task *timeout_task; | ||
54 | }; | 59 | }; |
55 | 60 | ||
56 | 61 | ||
@@ -163,35 +168,10 @@ do_notify (void *cls, | |||
163 | 168 | ||
164 | 169 | ||
165 | void | 170 | void |
166 | GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db, | ||
167 | GNUNET_PQ_SocketCallback sc, | ||
168 | void *sc_cls) | ||
169 | { | ||
170 | int fd; | ||
171 | |||
172 | db->sc = sc; | ||
173 | db->sc_cls = sc_cls; | ||
174 | if (NULL == sc) | ||
175 | return; | ||
176 | GNUNET_assert (0 == | ||
177 | pthread_mutex_lock (&db->notify_lock)); | ||
178 | fd = PQsocket (db->conn); | ||
179 | if ( (-1 != fd) && | ||
180 | (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | ||
181 | sc (sc_cls, | ||
182 | fd); | ||
183 | GNUNET_assert (0 == | ||
184 | pthread_mutex_unlock (&db->notify_lock)); | ||
185 | } | ||
186 | |||
187 | |||
188 | void | ||
189 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | 171 | GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) |
190 | { | 172 | { |
191 | PGnotify *n; | 173 | PGnotify *n; |
192 | 174 | ||
193 | GNUNET_assert (0 == | ||
194 | pthread_mutex_lock (&db->notify_lock)); | ||
195 | if (1 != | 175 | if (1 != |
196 | PQconsumeInput (db->conn)) | 176 | PQconsumeInput (db->conn)) |
197 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | 177 | GNUNET_log (GNUNET_ERROR_TYPE_ERROR, |
@@ -204,9 +184,17 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
204 | .extra = NULL | 184 | .extra = NULL |
205 | }; | 185 | }; |
206 | 186 | ||
187 | if ('X' != toupper ((int) n->relname[0])) | ||
188 | { | ||
189 | GNUNET_log (GNUNET_ERROR_TYPE_WARNING, | ||
190 | "Ignoring notification for unsupported channel identifier `%s'\n", | ||
191 | n->relname); | ||
192 | PQfreemem (n); | ||
193 | continue; | ||
194 | } | ||
207 | if (GNUNET_OK != | 195 | if (GNUNET_OK != |
208 | GNUNET_STRINGS_string_to_data (n->relname, | 196 | GNUNET_STRINGS_string_to_data (&n->relname[1], |
209 | strlen (n->relname), | 197 | strlen (&n->relname[1]), |
210 | &sh, | 198 | &sh, |
211 | sizeof (sh))) | 199 | sizeof (sh))) |
212 | { | 200 | { |
@@ -236,24 +224,10 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) | |||
236 | GNUNET_free (ctx.extra); | 224 | GNUNET_free (ctx.extra); |
237 | PQfreemem (n); | 225 | PQfreemem (n); |
238 | } | 226 | } |
239 | GNUNET_assert (0 == | ||
240 | pthread_mutex_unlock (&db->notify_lock)); | ||
241 | } | 227 | } |
242 | 228 | ||
243 | 229 | ||
244 | /** | 230 | /** |
245 | * Function called when the Postgres FD changes and we need | ||
246 | * to update the scheduler event loop task. | ||
247 | * | ||
248 | * @param cls a `struct GNUNET_PQ_Context *` | ||
249 | * @param fd the file descriptor, possibly -1 | ||
250 | */ | ||
251 | static void | ||
252 | scheduler_fd_cb (void *cls, | ||
253 | int fd); | ||
254 | |||
255 | |||
256 | /** | ||
257 | * The GNUnet scheduler notifies us that we need to | 231 | * The GNUnet scheduler notifies us that we need to |
258 | * trigger the DB event poller. | 232 | * trigger the DB event poller. |
259 | * | 233 | * |
@@ -308,27 +282,21 @@ scheduler_fd_cb (void *cls, | |||
308 | 282 | ||
309 | 283 | ||
310 | void | 284 | void |
311 | GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db) | 285 | GNUNET_PQ_event_scheduler_start_ (struct GNUNET_PQ_Context *db) |
312 | { | 286 | { |
313 | int fd; | 287 | if (db->scheduler_on) |
314 | 288 | return; | |
315 | GNUNET_assert (! db->scheduler_on); | ||
316 | GNUNET_assert (NULL == db->sc); | ||
317 | db->scheduler_on = true; | 289 | db->scheduler_on = true; |
318 | db->sc = &scheduler_fd_cb; | ||
319 | db->sc_cls = db; | ||
320 | fd = PQsocket (db->conn); | ||
321 | scheduler_fd_cb (db, | 290 | scheduler_fd_cb (db, |
322 | fd); | 291 | PQsocket (db->conn)); |
323 | } | 292 | } |
324 | 293 | ||
325 | 294 | ||
326 | void | 295 | void |
327 | GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) | 296 | GNUNET_PQ_event_scheduler_stop_ (struct GNUNET_PQ_Context *db) |
328 | { | 297 | { |
329 | GNUNET_assert (db->scheduler_on); | 298 | GNUNET_assert (db->scheduler_on); |
330 | GNUNET_free (db->rfd); | 299 | GNUNET_free (db->rfd); |
331 | db->sc = NULL; | ||
332 | db->scheduler_on = false; | 300 | db->scheduler_on = false; |
333 | if (NULL != db->event_task) | 301 | if (NULL != db->event_task) |
334 | { | 302 | { |
@@ -338,6 +306,13 @@ GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) | |||
338 | } | 306 | } |
339 | 307 | ||
340 | 308 | ||
309 | /** | ||
310 | * Helper function to trigger an SQL @a cmd on @a db | ||
311 | * | ||
312 | * @param db database to send command to | ||
313 | * @param cmd prefix of the command to send | ||
314 | * @param eh details about the event | ||
315 | */ | ||
341 | static void | 316 | static void |
342 | manage_subscribe (struct GNUNET_PQ_Context *db, | 317 | manage_subscribe (struct GNUNET_PQ_Context *db, |
343 | const char *cmd, | 318 | const char *cmd, |
@@ -351,6 +326,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db, | |||
351 | cmd); | 326 | cmd); |
352 | end = sh_to_channel (&eh->sh, | 327 | end = sh_to_channel (&eh->sh, |
353 | end); | 328 | end); |
329 | GNUNET_log (GNUNET_ERROR_TYPE_INFO, | ||
330 | "Executing PQ command `%s'\n", | ||
331 | sql); | ||
354 | result = PQexec (db->conn, | 332 | result = PQexec (db->conn, |
355 | sql); | 333 | sql); |
356 | if (PGRES_COMMAND_OK != PQresultStatus (result)) | 334 | if (PGRES_COMMAND_OK != PQresultStatus (result)) |
@@ -388,28 +366,48 @@ register_notify (void *cls, | |||
388 | struct GNUNET_DB_EventHandler *eh = value; | 366 | struct GNUNET_DB_EventHandler *eh = value; |
389 | 367 | ||
390 | manage_subscribe (db, | 368 | manage_subscribe (db, |
391 | "LISTEN ", | 369 | "LISTEN X", |
392 | eh); | 370 | eh); |
393 | return GNUNET_OK; | 371 | return GNUNET_OK; |
394 | } | 372 | } |
395 | 373 | ||
396 | 374 | ||
397 | void | 375 | void |
398 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db) | 376 | GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db, |
377 | int fd) | ||
399 | { | 378 | { |
400 | GNUNET_assert (0 == | 379 | if (! db->scheduler_on) |
401 | pthread_mutex_lock (&db->notify_lock)); | 380 | return; |
381 | scheduler_fd_cb (db, | ||
382 | fd); | ||
402 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, | 383 | GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, |
403 | ®ister_notify, | 384 | ®ister_notify, |
404 | db); | 385 | db); |
405 | GNUNET_assert (0 == | 386 | } |
406 | pthread_mutex_unlock (&db->notify_lock)); | 387 | |
388 | |||
389 | /** | ||
390 | * Function run on timeout for an event. Triggers | ||
391 | * the notification, but does NOT clear the handler. | ||
392 | * | ||
393 | * @param cls a `struct GNUNET_DB_EventHandler *` | ||
394 | */ | ||
395 | static void | ||
396 | event_timeout (void *cls) | ||
397 | { | ||
398 | struct GNUNET_DB_EventHandler *eh = cls; | ||
399 | |||
400 | eh->timeout_task = NULL; | ||
401 | eh->cb (eh->cb_cls, | ||
402 | NULL, | ||
403 | 0); | ||
407 | } | 404 | } |
408 | 405 | ||
409 | 406 | ||
410 | struct GNUNET_DB_EventHandler * | 407 | struct GNUNET_DB_EventHandler * |
411 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | 408 | GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, |
412 | const struct GNUNET_DB_EventHeaderP *es, | 409 | const struct GNUNET_DB_EventHeaderP *es, |
410 | struct GNUNET_TIME_Relative timeout, | ||
413 | GNUNET_DB_EventCallback cb, | 411 | GNUNET_DB_EventCallback cb, |
414 | void *cb_cls) | 412 | void *cb_cls) |
415 | { | 413 | { |
@@ -422,28 +420,20 @@ GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, | |||
422 | &eh->sh); | 420 | &eh->sh); |
423 | eh->cb = cb; | 421 | eh->cb = cb; |
424 | eh->cb_cls = cb_cls; | 422 | eh->cb_cls = cb_cls; |
425 | GNUNET_assert (0 == | ||
426 | pthread_mutex_lock (&db->notify_lock)); | ||
427 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); | 423 | was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)); |
428 | GNUNET_assert (GNUNET_OK == | 424 | GNUNET_assert (GNUNET_OK == |
429 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, | 425 | GNUNET_CONTAINER_multishortmap_put (db->channel_map, |
430 | &eh->sh, | 426 | &eh->sh, |
431 | eh, | 427 | eh, |
432 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); | 428 | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); |
433 | if ( (NULL != db->sc) && | 429 | if (was_zero) |
434 | was_zero) | 430 | GNUNET_PQ_event_scheduler_start_ (db); |
435 | { | ||
436 | int fd = PQsocket (db->conn); | ||
437 | |||
438 | if (-1 != fd) | ||
439 | db->sc (db->sc_cls, | ||
440 | fd); | ||
441 | } | ||
442 | manage_subscribe (db, | 431 | manage_subscribe (db, |
443 | "LISTEN ", | 432 | "LISTEN X", |
444 | eh); | 433 | eh); |
445 | GNUNET_assert (0 == | 434 | eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout, |
446 | pthread_mutex_unlock (&db->notify_lock)); | 435 | &event_timeout, |
436 | eh); | ||
447 | return eh; | 437 | return eh; |
448 | } | 438 | } |
449 | 439 | ||
@@ -453,24 +443,23 @@ GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh) | |||
453 | { | 443 | { |
454 | struct GNUNET_PQ_Context *db = eh->db; | 444 | struct GNUNET_PQ_Context *db = eh->db; |
455 | 445 | ||
456 | GNUNET_assert (0 == | ||
457 | pthread_mutex_lock (&db->notify_lock)); | ||
458 | GNUNET_assert (GNUNET_OK == | 446 | GNUNET_assert (GNUNET_OK == |
459 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, | 447 | GNUNET_CONTAINER_multishortmap_remove (db->channel_map, |
460 | &eh->sh, | 448 | &eh->sh, |
461 | eh)); | 449 | eh)); |
462 | 450 | ||
463 | manage_subscribe (db, | 451 | manage_subscribe (db, |
464 | "UNLISTEN ", | 452 | "UNLISTEN X", |
465 | eh); | 453 | eh); |
466 | if ( (NULL != db->sc) && | 454 | if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) |
467 | (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) | 455 | { |
456 | GNUNET_PQ_event_scheduler_stop_ (db); | ||
457 | } | ||
458 | if (NULL != eh->timeout_task) | ||
468 | { | 459 | { |
469 | db->sc (db->sc_cls, | 460 | GNUNET_SCHEDULER_cancel (eh->timeout_task); |
470 | -1); | 461 | eh->timeout_task = NULL; |
471 | } | 462 | } |
472 | GNUNET_assert (0 == | ||
473 | pthread_mutex_unlock (&db->notify_lock)); | ||
474 | GNUNET_free (eh); | 463 | GNUNET_free (eh); |
475 | } | 464 | } |
476 | 465 | ||
@@ -486,7 +475,7 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, | |||
486 | PGresult *result; | 475 | PGresult *result; |
487 | 476 | ||
488 | end = stpcpy (sql, | 477 | end = stpcpy (sql, |
489 | "NOTIFY "); | 478 | "NOTIFY X"); |
490 | end = es_to_channel (es, | 479 | end = es_to_channel (es, |
491 | end); | 480 | end); |
492 | end = stpcpy (end, | 481 | end = stpcpy (end, |
diff --git a/src/pq/test_pq.c b/src/pq/test_pq.c index ffbb4d129..90b5c6489 100644 --- a/src/pq/test_pq.c +++ b/src/pq/test_pq.c | |||
@@ -240,63 +240,6 @@ run_queries (struct GNUNET_PQ_Context *db) | |||
240 | } | 240 | } |
241 | 241 | ||
242 | 242 | ||
243 | static void | ||
244 | event_cb (void *cls, | ||
245 | const void *extra, | ||
246 | size_t extra_size) | ||
247 | { | ||
248 | unsigned int *cnt = cls; | ||
249 | |||
250 | GNUNET_assert (5 == extra_size); | ||
251 | GNUNET_assert (0 == memcmp ("world", | ||
252 | extra, | ||
253 | 5)); | ||
254 | (*cnt)++; | ||
255 | } | ||
256 | |||
257 | |||
258 | /** | ||
259 | * Run subscribe/notify tests. | ||
260 | * | ||
261 | * @param db database handle | ||
262 | * @return 0 on success | ||
263 | */ | ||
264 | static int | ||
265 | test_notify (struct GNUNET_PQ_Context *db) | ||
266 | { | ||
267 | struct GNUNET_DB_EventHeaderP e1 = { | ||
268 | .size = htons (sizeof (e1)), | ||
269 | .type = htons (1) | ||
270 | }; | ||
271 | struct GNUNET_DB_EventHeaderP e2 = { | ||
272 | .size = htons (sizeof (e2)), | ||
273 | .type = htons (2) | ||
274 | }; | ||
275 | unsigned int called = 0; | ||
276 | struct GNUNET_DB_EventHandler *eh; | ||
277 | |||
278 | eh = GNUNET_PQ_event_listen (db, | ||
279 | &e1, | ||
280 | &event_cb, | ||
281 | &called); | ||
282 | GNUNET_assert (NULL != eh); | ||
283 | GNUNET_PQ_event_notify (db, | ||
284 | &e2, | ||
285 | "hello", | ||
286 | 5); | ||
287 | GNUNET_PQ_event_do_poll (db); | ||
288 | GNUNET_assert (0 == called); | ||
289 | GNUNET_PQ_event_notify (db, | ||
290 | &e1, | ||
291 | "world", | ||
292 | 5); | ||
293 | GNUNET_PQ_event_do_poll (db); | ||
294 | GNUNET_assert (1 == called); | ||
295 | GNUNET_PQ_event_listen_cancel (eh); | ||
296 | return 0; | ||
297 | } | ||
298 | |||
299 | |||
300 | /** | 243 | /** |
301 | * Task called on shutdown. | 244 | * Task called on shutdown. |
302 | * | 245 | * |
@@ -305,7 +248,6 @@ test_notify (struct GNUNET_PQ_Context *db) | |||
305 | static void | 248 | static void |
306 | event_end (void *cls) | 249 | event_end (void *cls) |
307 | { | 250 | { |
308 | GNUNET_PQ_event_scheduler_stop (db); | ||
309 | GNUNET_PQ_event_listen_cancel (eh); | 251 | GNUNET_PQ_event_listen_cancel (eh); |
310 | eh = NULL; | 252 | eh = NULL; |
311 | if (NULL != tt) | 253 | if (NULL != tt) |
@@ -368,9 +310,9 @@ sched_tests (void *cls) | |||
368 | tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, | 310 | tt = GNUNET_SCHEDULER_add_delayed (GNUNET_TIME_UNIT_SECONDS, |
369 | &timeout_cb, | 311 | &timeout_cb, |
370 | NULL); | 312 | NULL); |
371 | GNUNET_PQ_event_scheduler_start (db); | ||
372 | eh = GNUNET_PQ_event_listen (db, | 313 | eh = GNUNET_PQ_event_listen (db, |
373 | &es, | 314 | &es, |
315 | GNUNET_TIME_UNIT_FOREVER_REL, | ||
374 | &event_sched_cb, | 316 | &event_sched_cb, |
375 | NULL); | 317 | NULL); |
376 | GNUNET_PQ_reconnect (db); | 318 | GNUNET_PQ_reconnect (db); |
@@ -404,7 +346,7 @@ main (int argc, | |||
404 | }; | 346 | }; |
405 | 347 | ||
406 | GNUNET_log_setup ("test-pq", | 348 | GNUNET_log_setup ("test-pq", |
407 | "WARNING", | 349 | "INFO", |
408 | NULL); | 350 | NULL); |
409 | db = GNUNET_PQ_connect ("postgres:///gnunetcheck", | 351 | db = GNUNET_PQ_connect ("postgres:///gnunetcheck", |
410 | NULL, | 352 | NULL, |
@@ -433,8 +375,6 @@ main (int argc, | |||
433 | return 1; | 375 | return 1; |
434 | } | 376 | } |
435 | ret = run_queries (db); | 377 | ret = run_queries (db); |
436 | ret |= test_notify (db); | ||
437 | ret |= test_notify (db); | ||
438 | if (0 != ret) | 378 | if (0 != ret) |
439 | { | 379 | { |
440 | GNUNET_break (0); | 380 | GNUNET_break (0); |
diff --git a/src/util/network.c b/src/util/network.c index 61da37ab7..014701e02 100644 --- a/src/util/network.c +++ b/src/util/network.c | |||
@@ -506,15 +506,15 @@ GNUNET_NETWORK_socket_bind (struct GNUNET_NETWORK_Handle *desc, | |||
506 | #endif | 506 | #endif |
507 | if (AF_UNIX == address->sa_family) | 507 | if (AF_UNIX == address->sa_family) |
508 | GNUNET_NETWORK_unix_precheck ((const struct sockaddr_un *) address); | 508 | GNUNET_NETWORK_unix_precheck ((const struct sockaddr_un *) address); |
509 | |||
509 | { | 510 | { |
510 | const int on = 1; | 511 | const int on = 1; |
511 | 512 | ||
512 | /* This is required here for TCP sockets, but only on UNIX */ | 513 | if ( (SOCK_STREAM == desc->type) && |
513 | if ((SOCK_STREAM == desc->type) && | 514 | (0 != setsockopt (desc->fd, |
514 | (0 != setsockopt (desc->fd, | 515 | SOL_SOCKET, |
515 | SOL_SOCKET, | 516 | SO_REUSEADDR, |
516 | SO_REUSEADDR, | 517 | &on, sizeof(on))) ) |
517 | &on, sizeof(on)))) | ||
518 | LOG_STRERROR (GNUNET_ERROR_TYPE_DEBUG, | 518 | LOG_STRERROR (GNUNET_ERROR_TYPE_DEBUG, |
519 | "setsockopt"); | 519 | "setsockopt"); |
520 | } | 520 | } |
@@ -883,15 +883,13 @@ GNUNET_NETWORK_socket_setsockopt (struct GNUNET_NETWORK_Handle *fd, | |||
883 | const void *option_value, | 883 | const void *option_value, |
884 | socklen_t option_len) | 884 | socklen_t option_len) |
885 | { | 885 | { |
886 | int ret; | 886 | return (0 == setsockopt (fd->fd, |
887 | 887 | level, | |
888 | ret = setsockopt (fd->fd, | 888 | option_name, |
889 | level, | 889 | option_value, |
890 | option_name, | 890 | option_len)) |
891 | option_value, | 891 | ? GNUNET_OK |
892 | option_len); | 892 | : GNUNET_SYSERR; |
893 | |||
894 | return ret == 0 ? GNUNET_OK : GNUNET_SYSERR; | ||
895 | } | 893 | } |
896 | 894 | ||
897 | 895 | ||
diff --git a/src/util/os_priority.c b/src/util/os_priority.c index dc2f0f97e..08320b291 100644 --- a/src/util/os_priority.c +++ b/src/util/os_priority.c | |||
@@ -47,7 +47,6 @@ struct GNUNET_OS_Process | |||
47 | */ | 47 | */ |
48 | pid_t pid; | 48 | pid_t pid; |
49 | 49 | ||
50 | |||
51 | /** | 50 | /** |
52 | * Pipe we use to signal the process. | 51 | * Pipe we use to signal the process. |
53 | * NULL if unused, or if process was deemed uncontrollable. | 52 | * NULL if unused, or if process was deemed uncontrollable. |
@@ -301,7 +300,8 @@ GNUNET_OS_process_destroy (struct GNUNET_OS_Process *proc) | |||
301 | * @param flags open flags (O_RDONLY, O_WRONLY) | 300 | * @param flags open flags (O_RDONLY, O_WRONLY) |
302 | */ | 301 | */ |
303 | static void | 302 | static void |
304 | open_dev_null (int target_fd, int flags) | 303 | open_dev_null (int target_fd, |
304 | int flags) | ||
305 | { | 305 | { |
306 | int fd; | 306 | int fd; |
307 | 307 | ||