aboutsummaryrefslogtreecommitdiff
path: root/src/pq/pq_event.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/pq/pq_event.c')
-rw-r--r--src/pq/pq_event.c320
1 files changed, 228 insertions, 92 deletions
diff --git a/src/pq/pq_event.c b/src/pq/pq_event.c
index ecb942230..79a2e80c6 100644
--- a/src/pq/pq_event.c
+++ b/src/pq/pq_event.c
@@ -29,8 +29,8 @@
29 29
30/** 30/**
31 * Handle for an active LISTENer to the database. 31 * Handle for an active LISTENer to the database.
32 */ 32 */
33struct GNUNET_PQ_EventHandler 33struct GNUNET_DB_EventHandler
34{ 34{
35 /** 35 /**
36 * Channel name. 36 * Channel name.
@@ -40,7 +40,7 @@ struct GNUNET_PQ_EventHandler
40 /** 40 /**
41 * Function to call on events. 41 * Function to call on events.
42 */ 42 */
43 GNUNET_PQ_EventCallback cb; 43 GNUNET_DB_EventCallback cb;
44 44
45 /** 45 /**
46 * Closure for @e cb. 46 * Closure for @e cb.
@@ -51,9 +51,12 @@ struct GNUNET_PQ_EventHandler
51 * Database context this event handler is with. 51 * Database context this event handler is with.
52 */ 52 */
53 struct GNUNET_PQ_Context *db; 53 struct GNUNET_PQ_Context *db;
54};
55
56 54
55 /**
56 * Task to run on timeout.
57 */
58 struct GNUNET_SCHEDULER_Task *timeout_task;
59};
57 60
58 61
59/** 62/**
@@ -63,7 +66,7 @@ struct GNUNET_PQ_EventHandler
63 * @param[out] sh short hash to set 66 * @param[out] sh short hash to set
64 */ 67 */
65static void 68static void
66es_to_sh (const struct GNUNET_PQ_EventHeaderP *es, 69es_to_sh (const struct GNUNET_DB_EventHeaderP *es,
67 struct GNUNET_ShortHashCode *sh) 70 struct GNUNET_ShortHashCode *sh)
68{ 71{
69 struct GNUNET_HashCode h_channel; 72 struct GNUNET_HashCode h_channel;
@@ -103,6 +106,24 @@ sh_to_channel (struct GNUNET_ShortHashCode *sh,
103 106
104 107
105/** 108/**
109 * Convert @a sh to a Postgres identifier.
110 *
111 * @param identifier to convert
112 * @param[out] sh set to short hash
113 * @return #GNUNET_OK on success
114 */
115static enum GNUNET_GenericReturnValue
116channel_to_sh (const char *identifier,
117 struct GNUNET_ShortHashCode *sh)
118{
119 return GNUNET_STRINGS_string_to_data (identifier,
120 strlen (identifier),
121 sh,
122 sizeof (*sh));
123}
124
125
126/**
106 * Convert @a es to a Postgres identifier. 127 * Convert @a es to a Postgres identifier.
107 * 128 *
108 * @param es spec to hash to an identifier 129 * @param es spec to hash to an identifier
@@ -111,7 +132,7 @@ sh_to_channel (struct GNUNET_ShortHashCode *sh,
111 * @return end position of the identifier 132 * @return end position of the identifier
112 */ 133 */
113static char * 134static char *
114es_to_channel (const struct GNUNET_PQ_EventHeaderP *es, 135es_to_channel (const struct GNUNET_DB_EventHeaderP *es,
115 char identifier[64]) 136 char identifier[64])
116{ 137{
117 struct GNUNET_ShortHashCode sh; 138 struct GNUNET_ShortHashCode sh;
@@ -141,12 +162,12 @@ struct NotifyContext
141 162
142 163
143/** 164/**
144 * Function called on every event handler that 165 * Function called on every event handler that
145 * needs to be triggered. 166 * needs to be triggered.
146 * 167 *
147 * @param cls a `struct NotifyContext` 168 * @param cls a `struct NotifyContext`
148 * @param sh channel name 169 * @param sh channel name
149 * @param value a `struct GNUNET_PQ_EventHandler` 170 * @param value a `struct GNUNET_DB_EventHandler`
150 * @return #GNUNET_OK continue to iterate 171 * @return #GNUNET_OK continue to iterate
151 */ 172 */
152static int 173static int
@@ -155,45 +176,27 @@ do_notify (void *cls,
155 void *value) 176 void *value)
156{ 177{
157 struct NotifyContext *ctx = cls; 178 struct NotifyContext *ctx = cls;
158 struct GNUNET_PQ_EventHandler *eh = value; 179 struct GNUNET_DB_EventHandler *eh = value;
159 180
160 eh->cb (eh->cb_cls, 181 eh->cb (eh->cb_cls,
161 ctx->extra, 182 ctx->extra,
162 ctx->extra_size); 183 ctx->extra_size);
163 return GNUNET_OK; 184 return GNUNET_OK;
164}
165
166
167void
168GNUNET_PQ_event_set_socket_callback (struct GNUNET_PQ_Context *db,
169 GNUNET_PQ_SocketCallback sc,
170 void *sc_cls)
171{
172 int fd;
173
174 db->sc = sc;
175 db->sc_cls = sc_cls;
176 if (NULL == sc)
177 return;
178 GNUNET_assert (0 ==
179 pthread_mutex_lock (&db->notify_lock));
180 fd = PQsocket (db->conn);
181 if ( (-1 != fd) &&
182 (0 != GNUNET_CONTAINER_multishortmap_size (db->channel_map)) )
183 sc (sc_cls,
184 fd);
185 GNUNET_assert (0 ==
186 pthread_mutex_unlock (&db->notify_lock));
187} 185}
188 186
189 187
190void 188static void
191GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db) 189event_do_poll (struct GNUNET_PQ_Context *db)
192{ 190{
193 PGnotify *n; 191 PGnotify *n;
194 192
195 GNUNET_assert (0 == 193 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
196 pthread_mutex_lock (&db->notify_lock)); 194 "PG poll job active\n");
195 if (1 !=
196 PQconsumeInput (db->conn))
197 GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
198 "Failed to read from Postgres: %s\n",
199 PQerrorMessage (db->conn));
197 while (NULL != (n = PQnotifies (db->conn))) 200 while (NULL != (n = PQnotifies (db->conn)))
198 { 201 {
199 struct GNUNET_ShortHashCode sh; 202 struct GNUNET_ShortHashCode sh;
@@ -201,15 +204,22 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
201 .extra = NULL 204 .extra = NULL
202 }; 205 };
203 206
207 if ('X' != toupper ((int) n->relname[0]))
208 {
209 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
210 "Ignoring notification for unsupported channel identifier `%s'\n",
211 n->relname);
212 PQfreemem (n);
213 continue;
214 }
204 if (GNUNET_OK != 215 if (GNUNET_OK !=
205 GNUNET_STRINGS_string_to_data (n->relname, 216 channel_to_sh (&n->relname[1],
206 strlen (n->relname), 217 &sh))
207 &sh,
208 sizeof (sh)))
209 { 218 {
210 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 219 GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
211 "Ignoring notification for unsupported channel identifier `%s'\n", 220 "Ignoring notification for unsupported channel identifier `%s'\n",
212 n->relname); 221 n->relname);
222 PQfreemem (n);
213 continue; 223 continue;
214 } 224 }
215 if ( (NULL != n->extra) && 225 if ( (NULL != n->extra) &&
@@ -223,36 +233,97 @@ GNUNET_PQ_event_do_poll (struct GNUNET_PQ_Context *db)
223 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n", 233 "Ignoring notification for unsupported extra data `%s' on channel `%s'\n",
224 n->extra, 234 n->extra,
225 n->relname); 235 n->relname);
236 PQfreemem (n);
226 continue; 237 continue;
227 } 238 }
228 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map, 239 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
229 &do_notify, 240 "Received notification %s with extra data `%.*s'\n",
230 &ctx); 241 n->relname,
242 (int) ctx.extra_size,
243 (const char *) ctx.extra);
244 GNUNET_CONTAINER_multishortmap_get_multiple (db->channel_map,
245 &sh,
246 &do_notify,
247 &ctx);
231 GNUNET_free (ctx.extra); 248 GNUNET_free (ctx.extra);
249 PQfreemem (n);
232 } 250 }
233 GNUNET_assert (0 ==
234 pthread_mutex_unlock (&db->notify_lock));
235} 251}
236 252
237 253
238void 254/**
239GNUNET_PQ_event_scheduler_start (struct GNUNET_PQ_Context *db) 255 * The GNUnet scheduler notifies us that we need to
256 * trigger the DB event poller.
257 *
258 * @param cls a `struct GNUNET_PQ_Context *`
259 */
260static void
261do_scheduler_notify (void *cls)
240{ 262{
241 GNUNET_break (0); // FIXME: not implemented 263 struct GNUNET_PQ_Context *db = cls;
264
265 db->event_task = NULL;
266 GNUNET_assert (NULL != db->rfd);
267 event_do_poll (db);
268 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
269 "Resubscribing\n");
270 db->event_task
271 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_FOREVER_REL,
272 db->rfd,
273 &do_scheduler_notify,
274 db);
242} 275}
243 276
244 277
245void 278/**
246GNUNET_PQ_event_scheduler_stop (struct GNUNET_PQ_Context *db) 279 * Function called when the Postgres FD changes and we need
280 * to update the scheduler event loop task.
281 *
282 * @param cls a `struct GNUNET_PQ_Context *`
283 * @param fd the file descriptor, possibly -1
284 */
285static void
286scheduler_fd_cb (void *cls,
287 int fd)
247{ 288{
248 GNUNET_break (0); // FIXME: not implemented 289 struct GNUNET_PQ_Context *db = cls;
290
291 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
292 "New poll FD is %d\n",
293 fd);
294 if (NULL != db->event_task)
295 {
296 GNUNET_SCHEDULER_cancel (db->event_task);
297 db->event_task = NULL;
298 }
299 GNUNET_free (db->rfd);
300 if (-1 == fd)
301 return;
302 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
303 return;
304 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
305 "Activating poll job on %d\n",
306 fd);
307 db->rfd = GNUNET_NETWORK_socket_box_native (fd);
308 db->event_task
309 = GNUNET_SCHEDULER_add_read_net (GNUNET_TIME_UNIT_ZERO,
310 db->rfd,
311 &do_scheduler_notify,
312 db);
249} 313}
250 314
251 315
316/**
317 * Helper function to trigger an SQL @a cmd on @a db
318 *
319 * @param db database to send command to
320 * @param cmd prefix of the command to send
321 * @param eh details about the event
322 */
252static void 323static void
253manage_subscribe (struct GNUNET_PQ_Context *db, 324manage_subscribe (struct GNUNET_PQ_Context *db,
254 const char *cmd, 325 const char *cmd,
255 struct GNUNET_PQ_EventHandler *eh) 326 struct GNUNET_DB_EventHandler *eh)
256{ 327{
257 char sql[16 + 64]; 328 char sql[16 + 64];
258 char *end; 329 char *end;
@@ -262,6 +333,9 @@ manage_subscribe (struct GNUNET_PQ_Context *db,
262 cmd); 333 cmd);
263 end = sh_to_channel (&eh->sh, 334 end = sh_to_channel (&eh->sh,
264 end); 335 end);
336 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
337 "Executing PQ command `%s'\n",
338 sql);
265 result = PQexec (db->conn, 339 result = PQexec (db->conn,
266 sql); 340 sql);
267 if (PGRES_COMMAND_OK != PQresultStatus (result)) 341 if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -282,76 +356,134 @@ manage_subscribe (struct GNUNET_PQ_Context *db,
282} 356}
283 357
284 358
285struct GNUNET_PQ_EventHandler * 359/**
360 * Re-subscribe to notifications after disconnect.
361 *
362 * @param cls the DB context
363 * @param sh the short hash of the channel
364 * @param eh the event handler
365 * @return #GNUNET_OK to continue to iterate
366 */
367static int
368register_notify (void *cls,
369 const struct GNUNET_ShortHashCode *sh,
370 void *value)
371{
372 struct GNUNET_PQ_Context *db = cls;
373 struct GNUNET_DB_EventHandler *eh = value;
374
375 manage_subscribe (db,
376 "LISTEN X",
377 eh);
378 return GNUNET_OK;
379}
380
381
382void
383GNUNET_PQ_event_reconnect_ (struct GNUNET_PQ_Context *db,
384 int fd)
385{
386 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
387 "Change in PQ event FD to %d\n",
388 fd);
389 scheduler_fd_cb (db,
390 fd);
391 GNUNET_CONTAINER_multishortmap_iterate (db->channel_map,
392 &register_notify,
393 db);
394}
395
396
397/**
398 * Function run on timeout for an event. Triggers
399 * the notification, but does NOT clear the handler.
400 *
401 * @param cls a `struct GNUNET_DB_EventHandler *`
402 */
403static void
404event_timeout (void *cls)
405{
406 struct GNUNET_DB_EventHandler *eh = cls;
407
408 eh->timeout_task = NULL;
409 eh->cb (eh->cb_cls,
410 NULL,
411 0);
412}
413
414
415struct GNUNET_DB_EventHandler *
286GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db, 416GNUNET_PQ_event_listen (struct GNUNET_PQ_Context *db,
287 const struct GNUNET_PQ_EventHeaderP *es, 417 const struct GNUNET_DB_EventHeaderP *es,
288 GNUNET_PQ_EventCallback cb, 418 struct GNUNET_TIME_Relative timeout,
419 GNUNET_DB_EventCallback cb,
289 void *cb_cls) 420 void *cb_cls)
290{ 421{
291 struct GNUNET_PQ_EventHandler *eh; 422 struct GNUNET_DB_EventHandler *eh;
292 bool was_zero;
293 423
294 eh = GNUNET_new (struct GNUNET_PQ_EventHandler); 424 eh = GNUNET_new (struct GNUNET_DB_EventHandler);
295 eh->db = db; 425 eh->db = db;
296 es_to_sh (es, 426 es_to_sh (es,
297 &eh->sh); 427 &eh->sh);
298 eh->cb = cb; 428 eh->cb = cb;
299 eh->cb_cls = cb_cls; 429 eh->cb_cls = cb_cls;
300 GNUNET_assert (0 ==
301 pthread_mutex_lock (&db->notify_lock));
302 was_zero = (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map));
303 GNUNET_assert (GNUNET_OK == 430 GNUNET_assert (GNUNET_OK ==
304 GNUNET_CONTAINER_multishortmap_put (db->channel_map, 431 GNUNET_CONTAINER_multishortmap_put (db->channel_map,
305 &eh->sh, 432 &eh->sh,
306 eh, 433 eh,
307 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); 434 GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE));
308 if ( (NULL != db->sc) && 435 if (NULL == db->event_task)
309 was_zero)
310 { 436 {
311 int fd = PQsocket (db->conn); 437 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
312 438 "Starting event scheduler\n");
313 if (-1 != fd) 439 scheduler_fd_cb (db,
314 db->sc (db->sc_cls, 440 PQsocket (db->conn));
315 fd);
316 } 441 }
317 manage_subscribe (db, 442 manage_subscribe (db,
318 "LISTEN ", 443 "LISTEN X",
319 eh); 444 eh);
320 GNUNET_assert (0 == 445 eh->timeout_task = GNUNET_SCHEDULER_add_delayed (timeout,
321 pthread_mutex_unlock (&db->notify_lock)); 446 &event_timeout,
447 eh);
322 return eh; 448 return eh;
323} 449}
324 450
325 451
326void 452void
327GNUNET_PQ_event_listen_cancel (struct GNUNET_PQ_EventHandler *eh) 453GNUNET_PQ_event_listen_cancel (struct GNUNET_DB_EventHandler *eh)
328{ 454{
329 struct GNUNET_PQ_Context *db = eh->db; 455 struct GNUNET_PQ_Context *db = eh->db;
330 456
331 GNUNET_assert (0 ==
332 pthread_mutex_lock (&db->notify_lock));
333 GNUNET_assert (GNUNET_OK == 457 GNUNET_assert (GNUNET_OK ==
334 GNUNET_CONTAINER_multishortmap_remove (db->channel_map, 458 GNUNET_CONTAINER_multishortmap_remove (db->channel_map,
335 &eh->sh, 459 &eh->sh,
336 eh)); 460 eh));
337
338 manage_subscribe (db, 461 manage_subscribe (db,
339 "UNLISTEN ", 462 "UNLISTEN X",
340 eh); 463 eh);
341 if ( (NULL != db->sc) && 464 if (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map))
342 (0 == GNUNET_CONTAINER_multishortmap_size (db->channel_map)) ) 465 {
466 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
467 "Stopping PQ event scheduler job\n");
468 GNUNET_free (db->rfd);
469 if (NULL != db->event_task)
470 {
471 GNUNET_SCHEDULER_cancel (db->event_task);
472 db->event_task = NULL;
473 }
474 }
475 if (NULL != eh->timeout_task)
343 { 476 {
344 db->sc (db->sc_cls, 477 GNUNET_SCHEDULER_cancel (eh->timeout_task);
345 -1); 478 eh->timeout_task = NULL;
346 } 479 }
347 GNUNET_assert (0 == 480 GNUNET_free (eh);
348 pthread_mutex_unlock (&db->notify_lock));
349} 481}
350 482
351 483
352void 484void
353GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db, 485GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
354 const struct GNUNET_PQ_EventHeaderP *es, 486 const struct GNUNET_DB_EventHeaderP *es,
355 const void *extra, 487 const void *extra,
356 size_t extra_size) 488 size_t extra_size)
357{ 489{
@@ -360,11 +492,11 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
360 PGresult *result; 492 PGresult *result;
361 493
362 end = stpcpy (sql, 494 end = stpcpy (sql,
363 "NOTIFY "); 495 "NOTIFY X");
364 end = es_to_channel (es, 496 end = es_to_channel (es,
365 end); 497 end);
366 end = stpcpy (end, 498 end = stpcpy (end,
367 "'"); 499 ", '");
368 end = GNUNET_STRINGS_data_to_string (extra, 500 end = GNUNET_STRINGS_data_to_string (extra,
369 extra_size, 501 extra_size,
370 end, 502 end,
@@ -373,6 +505,9 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
373 *end = '\0'; 505 *end = '\0';
374 end = stpcpy (end, 506 end = stpcpy (end,
375 "'"); 507 "'");
508 GNUNET_log (GNUNET_ERROR_TYPE_INFO,
509 "Executing command `%s'\n",
510 sql);
376 result = PQexec (db->conn, 511 result = PQexec (db->conn,
377 sql); 512 sql);
378 if (PGRES_COMMAND_OK != PQresultStatus (result)) 513 if (PGRES_COMMAND_OK != PQresultStatus (result))
@@ -390,7 +525,8 @@ GNUNET_PQ_event_notify (struct GNUNET_PQ_Context *db,
390 PQerrorMessage (db->conn)); 525 PQerrorMessage (db->conn));
391 } 526 }
392 PQclear (result); 527 PQclear (result);
528 event_do_poll (db);
393} 529}
394 530
395/* end of pq_event.c */
396 531
532/* end of pq_event.c */