taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit cc7dc52133342763c69867fc3c47166f4ffe01b7
parent f63c735f81d8e85288d911db085ea26fcda28ad7
Author: Antoine A <>
Date:   Wed, 20 May 2026 10:22:49 +0200

common: improve test routine and db notification

Diffstat:
Mcommon/taler-api/src/db.rs | 62+++++++++++++++++++++++++++++++++++++++-----------------------
Mcommon/taler-api/src/notification.rs | 49+++++++++++++++++++++++++++++++------------------
Mcommon/taler-api/src/notification/de.rs | 272+++++++++++++++++++++++++++++++++++++------------------------------------------
Mcommon/taler-api/src/test/db.rs | 2+-
Mcommon/taler-common/src/api_params.rs | 43+++++++++++++++++++++++++++++++++++--------
Mcommon/taler-test-utils/src/routine.rs | 6+++---
Mtaler-cyclos/src/bin/cyclos-harness.rs | 8++++----
Mtaler-cyclos/src/db.rs | 2+-
Mtaler-magnet-bank/src/bin/magnet-bank-harness.rs | 4++--
Mtaler-magnet-bank/src/db.rs | 2+-
10 files changed, 244 insertions(+), 206 deletions(-)

diff --git a/common/taler-api/src/db.rs b/common/taler-api/src/db.rs @@ -29,7 +29,7 @@ use sqlx::{ }; use taler_common::{ api_common::SafeU64, - api_params::{History, Page}, + api_params::{History, Page, Pooling}, types::{ amount::{Amount, Currency, Decimal}, iban::IBAN, @@ -140,31 +140,22 @@ pub async fn page<'a, 'b, R: Send + Unpin>( }) } -pub async fn history<'a, 'b, T: Send + Unpin>( - pool: &PgPool, - id_col: &str, - params: &History, - listen: impl FnOnce() -> watch::Receiver<i64>, - prepare: impl Fn() -> QueryBuilder<'a, Postgres> + Copy, - map: impl Fn(PgRow) -> Result<T, Error> + Send + Copy, -) -> Result<Vec<T>, Error> { - let load = async || page(pool, &params.page, id_col, prepare, map).await; - - // When going backward there is always at least one transaction or none - if params.page.limit >= 0 && params.timeout_ms.is_some_and(|it| it > 0) { +pub async fn pooling<R, N, F: Future<Output = sqlx::Result<R>>>( + params: &Pooling, + listen: impl FnOnce() -> watch::Receiver<N>, + filter: impl FnMut(&N) -> bool, + mut load: impl FnMut() -> F, + mut check: impl FnMut(&R) -> bool, +) -> Result<R, Error> { + let timeout = params.timeout_ms.unwrap_or_default(); + if timeout > 0 { let mut listener = listen(); let init = load().await?; // Long polling if we found no transactions - if init.is_empty() { - let pooling = tokio::time::timeout( - Duration::from_millis(params.timeout_ms.unwrap_or(0)), - async { - listener - .wait_for(|id| params.page.offset.is_none_or(|offset| *id > offset)) - .await - .ok(); - }, - ) + if !check(&init) { + let pooling = tokio::time::timeout(Duration::from_millis(timeout), async { + listener.wait_for(filter).await.ok(); + }) .await; match pooling { Ok(_) => load().await, @@ -178,6 +169,31 @@ pub async fn history<'a, 'b, T: Send + Unpin>( } } +pub async fn history<T: Send + Unpin>( + db: &PgPool, + id_col: &str, + params: &History, + listen: impl FnOnce() -> watch::Receiver<i64>, + prepare: impl Fn() -> QueryBuilder<'static, Postgres> + Copy, + map: impl Fn(PgRow) -> Result<T, Error> + Send + Copy, +) -> Result<Vec<T>, Error> { + let load = async || page(db, &params.page, id_col, prepare, map).await; + // When going backward there is always at least one transaction or none + let poll = if params.page.limit < 0 { + &Pooling::default() + } else { + &params.pooling + }; + pooling( + poll, + listen, + |id| params.page.offset.is_none_or(|offset| *id > offset), + load, + |init| !init.is_empty(), + ) + .await +} + /* ----- Bind ----- */ pub trait BindHelper { diff --git a/common/taler-api/src/notification.rs b/common/taler-api/src/notification.rs @@ -14,7 +14,7 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::hash::Hash; +use std::{hash::Hash, sync::Arc}; use dashmap::DashMap; use tokio::sync::watch::{self, Receiver}; @@ -24,12 +24,15 @@ pub mod de; /// Listen for many postgres notification channels using a single connection #[macro_export] macro_rules! notification_listener { - ($pool: expr, $($channel:expr => ($($arg:ident: $type:ty),*) $lambda:block),*$(,)?) => { - { - let mut listener = ::sqlx::postgres::PgListener::connect_with($pool).await?; - listener.listen_all([$($channel,)*]).await?; - loop { - while let Some(notification) = listener.try_recv().await? { + ($pool: expr, $($channel:expr => ($($arg:ident: $type:ty),*) $lambda:block),*$(,)?) => {{ + let mut jitter = ::taler_common::ExpoBackoffDecorr::default(); + loop { + if let ::sqlx::Result::<(), ::sqlx::Error>::Err(e) = async { + let mut listener = ::sqlx::postgres::PgListener::connect_with($pool).await?; + listener.listen_all([$($channel,)*]).await?; + jitter.reset(); + loop { + let notification = listener.recv().await?; tracing::debug!(target: "db-watcher", "db notification: {} - {}", notification.channel(), @@ -37,31 +40,41 @@ macro_rules! notification_listener { ); match notification.channel() { $($channel => { - let ($($arg,)*): ($($type,)*) = - $crate::notification::de::from_str(notification.payload()).unwrap();// TODO error handling - $lambda + match $crate::notification::de::from_str::<($($type,)*)>(notification.payload()) { + Ok(($($arg,)*)) => { + $lambda + } + Err(e) => { + tracing::error!(target: "db-watcher", + "db notification: {} {e} - {}", + notification.channel(), + notification.payload() + ); + } + } }),* - unknown => unreachable!("{}", unknown), + unknown => unreachable!("{unknown}"), } } - // TODO wait before reconnect - } + }.await { + tokio::time::sleep(jitter.backoff()).await; + tracing::error!(target: "db-watcher", "{e}"); + }; } - - } + }} } pub use notification_listener; -#[derive(Default)] +#[derive(Default, Clone)] pub struct NotificationChannel<K: Eq + Hash, V> { - map: DashMap<K, watch::Sender<V>>, + map: Arc<DashMap<K, watch::Sender<V>>>, } impl<K: Eq + Hash, V> NotificationChannel<K, V> { pub fn new() -> Self { Self { - map: DashMap::new(), + map: Arc::new(DashMap::new()), } } } diff --git a/common/taler-api/src/notification/de.rs b/common/taler-api/src/notification/de.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -20,93 +20,33 @@ use serde::{ forward_to_deserialize_any, }; -/// Parse notification message content -pub fn from_str<'de, T: Deserialize<'de>>( - notification: &'de str, -) -> Result<T, serde::de::value::Error> { - T::deserialize(Deserializer(notification)) -} - -macro_rules! gen_fn { - (deserialize_unit_struct, $self:ident, $v:ident, $content:block) => { - gen_fn!(deserialize_unit_struct, $self, (_name: &'static str, $v: V), $content); - }; - (deserialize_newtype_struct, $self:ident, $v:ident, $content:block) => { - gen_fn!(deserialize_newtype_struct, $self, (_name: &'static str, $v: V), $content); - }; - (deserialize_tuple, $self:ident, $v:ident, $content:block) => { - gen_fn!(deserialize_tuple, $self, (_len: usize, $v: V), $content); - }; - (deserialize_tuple_struct, $self:ident, $v:ident, $content:block) => { - gen_fn!(deserialize_tuple_struct, $self, (_name: &'static str, _len: usize, $v: V), $content); - }; - (deserialize_struct, $self:ident, $v:ident, $content:block) => { - gen_fn!(deserialize_struct, $self, (_name: &'static str, _fields: &'static [&'static str], $v: V), $content); - }; - (deserialize_enum, $self:ident, $v:ident, $content:block) => { - gen_fn!(deserialize_enum, $self, (_name: &'static str, _variants: &'static [&'static str], $v: V), $content); - }; - ($fn:ident, $self:ident, $v:ident, $content:block) => { gen_fn!($fn, $self, ($v: V), $content); }; - ($fn:ident, $self:ident, ($($arg:ident: $type:ty),*), $content:block) => { - fn $fn<V: Visitor<'de>>($self$(, $arg: $type)*) -> Result<V::Value, de::value::Error> { - $content - } - }; +/// Deserialize a value `T` from a Postgres notification payload string +pub fn from_str<'de, T: Deserialize<'de>>(notification: &'de str) -> Result<T, de::value::Error> { + T::deserialize(NotifDe(notification)) } macro_rules! forward_to_parse { - ($(($from:ident, $visit_func:ident)),*$(,)?) => { + ($($deserialize_fn:ident => $visit_fn:ident),* $(,)?) => { $( - gen_fn!($from, self, visitor, { - visitor.$visit_func( - self.0 - .parse() - .map_err(|e| Error::custom(format!("{}", e)))?, - ) - }); + fn $deserialize_fn<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value, Self::Error> { + visitor.$visit_fn(self.0.parse().map_err(Error::custom)?) + } )* - } -} - -macro_rules! forward_to_other { - ($(($from:ident, $to:ident)),*$(,)?) => { - $( - gen_fn!($from, self, visitor, { - self.$to(visitor) - }); - )* - } -} - -macro_rules! fail_unsupported { - ($(($from:ident, $ty:expr)),*$(,)?) => { - $( - gen_fn!($from, self, _visitor, { - Err(Error::custom(format!("cannot deserialize non primitive type {}", $ty))) - }); - )* - } + }; } /// Space-separated values deserializer for Postgres notification -/// Support tuples only -struct Deserializer<'de>(&'de str); +struct NotifDe<'de>(&'de str); -impl<'de> serde::de::Deserializer<'de> for Deserializer<'de> { +impl<'de> de::Deserializer<'de> for NotifDe<'de> { type Error = de::value::Error; - fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error> - where - V: Visitor<'de>, - { + fn deserialize_any<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value, Self::Error> { let de = SeqDeserializer::new(self.0.split(' ').map(PlainDe)); visitor.visit_seq(de) } - fn deserialize_unit<V>(self, visitor: V) -> Result<V::Value, Self::Error> - where - V: Visitor<'de>, - { + fn deserialize_unit<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value, Self::Error> { PlainDe(self.0).deserialize_unit(visitor) } @@ -117,10 +57,10 @@ impl<'de> serde::de::Deserializer<'de> for Deserializer<'de> { } } -/// Deserializer for any plain value that can be parsed from a string +/// Values deserializer for Postgres notifications struct PlainDe<'de>(&'de str); -impl<'de> serde::de::IntoDeserializer<'de> for PlainDe<'de> { +impl<'de> de::IntoDeserializer<'de> for PlainDe<'de> { type Deserializer = Self; fn into_deserializer(self) -> Self::Deserializer { @@ -128,20 +68,14 @@ impl<'de> serde::de::IntoDeserializer<'de> for PlainDe<'de> { } } -impl<'de> serde::de::Deserializer<'de> for PlainDe<'de> { - type Error = serde::de::value::Error; +impl<'de> de::Deserializer<'de> for PlainDe<'de> { + type Error = de::value::Error; - fn deserialize_str<V>(self, visitor: V) -> Result<V::Value, Self::Error> - where - V: Visitor<'de>, - { + fn deserialize_any<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value, Self::Error> { visitor.visit_borrowed_str(self.0) } - fn deserialize_option<V>(self, visitor: V) -> Result<V::Value, Self::Error> - where - V: Visitor<'de>, - { + fn deserialize_option<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value, Self::Error> { if self.0.is_empty() { visitor.visit_none() } else { @@ -149,72 +83,120 @@ impl<'de> serde::de::Deserializer<'de> for PlainDe<'de> { } } - fn deserialize_unit<V>(self, visitor: V) -> Result<V::Value, Self::Error> - where - V: Visitor<'de>, - { + fn deserialize_unit<V: Visitor<'de>>(self, visitor: V) -> Result<V::Value, Self::Error> { if self.0.is_empty() { visitor.visit_unit() } else { - Err(Error::custom("expected empty string for unit")) + Err(Error::custom("expected empty token for unit type")) } } - forward_to_parse!( - (deserialize_i8, visit_i8), - (deserialize_i16, visit_i16), - (deserialize_i32, visit_i32), - (deserialize_i64, visit_i64), - (deserialize_i128, visit_i128), - (deserialize_u8, visit_u8), - (deserialize_u16, visit_u16), - (deserialize_u32, visit_u32), - (deserialize_u64, visit_u64), - (deserialize_u128, visit_u128), - (deserialize_f32, visit_f32), - (deserialize_f64, visit_f64), - (deserialize_char, visit_char), - (deserialize_bool, visit_bool) - ); - - forward_to_other!( - (deserialize_string, deserialize_str), - (deserialize_any, deserialize_str), - (deserialize_unit_struct, deserialize_unit), - ); - - fail_unsupported!( - (deserialize_bytes, "bytes"), - (deserialize_byte_buf, "bytes"), - (deserialize_seq, "seq"), - (deserialize_tuple_struct, "tuple"), - (deserialize_tuple, "tuple"), - (deserialize_map, "map"), - (deserialize_struct, "struct"), - (deserialize_newtype_struct, "struct"), - (deserialize_enum, "enum"), - (deserialize_identifier, "identifier"), - (deserialize_ignored_any, "any") - ); + fn deserialize_unit_struct<V: Visitor<'de>>( + self, + _name: &'static str, + visitor: V, + ) -> Result<V::Value, Self::Error> { + self.deserialize_unit(visitor) + } + + fn deserialize_newtype_struct<V: Visitor<'de>>( + self, + _name: &'static str, + visitor: V, + ) -> Result<V::Value, Self::Error> { + visitor.visit_newtype_struct(self) + } + + fn deserialize_enum<V: Visitor<'de>>( + self, + name: &'static str, + variants: &'static [&'static str], + visitor: V, + ) -> Result<V::Value, Self::Error> { + use de::IntoDeserializer; + self.0 + .into_deserializer() + .deserialize_enum(name, variants, visitor) + } + + forward_to_parse! { + deserialize_i8 => visit_i8, + deserialize_i16 => visit_i16, + deserialize_i32 => visit_i32, + deserialize_i64 => visit_i64, + deserialize_i128 => visit_i128, + deserialize_u8 => visit_u8, + deserialize_u16 => visit_u16, + deserialize_u32 => visit_u32, + deserialize_u64 => visit_u64, + deserialize_u128 => visit_u128, + deserialize_f32 => visit_f32, + deserialize_f64 => visit_f64, + deserialize_char => visit_char, + deserialize_bool => visit_bool, + } + + forward_to_deserialize_any! { + str string bytes byte_buf seq tuple tuple_struct map struct identifier ignored_any + } } -#[test] -pub fn parse() { - // Parse simple message - assert_eq!((1,), from_str::<(u8,)>("1").unwrap()); - assert_eq!((false,), from_str::<(bool,)>("false").unwrap()); - assert_eq!(("username",), from_str::<(&str,)>("username").unwrap()); - - // Parse composite message - assert_eq!((1, false), from_str::<(u8, bool)>("1 false").unwrap()); - assert_eq!([1, 2, 3, 4], from_str::<[u8; 4]>("1 2 3 4").unwrap()); - - // Parse corner case - assert_eq!((), from_str::<()>("").unwrap()); - assert_eq!((1, None), from_str::<(u8, Option<bool>)>("1 ").unwrap()); - assert_eq!((1, ()), from_str::<(u8, ())>("1 ").unwrap()); - assert_eq!( - (1, (), true), - from_str::<(u8, (), bool)>("1 true").unwrap() - ); +#[cfg(test)] +mod tests { + use super::from_str; + + #[derive(Debug, PartialEq, Eq, serde::Deserialize)] + #[allow(non_camel_case_types)] + enum Enum { + pending, + active, + closed, + } + + #[derive(Debug, PartialEq, Eq, serde::Deserialize)] + struct NewType(u32); + + #[test] + fn parse() { + // Single primitives + assert_eq!((1u8,), from_str::<(u8,)>("1").unwrap()); + assert_eq!((false,), from_str::<(bool,)>("false").unwrap()); + assert_eq!(("hello",), from_str::<(&str,)>("hello").unwrap()); + + // Composite + assert_eq!((1u8, false), from_str::<(u8, bool)>("1 false").unwrap()); + assert_eq!([1u8, 2, 3, 4], from_str::<[u8; 4]>("1 2 3 4").unwrap()); + + // Empty payload → unit + assert_eq!((), from_str::<()>("").unwrap()); + + // Trailing space → None / () + assert_eq!((1u8, None), from_str::<(u8, Option<bool>)>("1 ").unwrap()); + assert_eq!((1u8, ()), from_str::<(u8, ())>("1 ").unwrap()); + + // Double space → empty token in the middle + assert_eq!( + (1u8, (), true), + from_str::<(u8, (), bool)>("1 true").unwrap() + ); + + // Enums matched by variant name + assert_eq!( + (42u32, Enum::active), + from_str::<(u32, Enum)>("42 active").unwrap() + ); + assert_eq!( + (1u8, (), true, Enum::closed), + from_str::<(u8, (), bool, Enum)>("1 true closed").unwrap() + ); + + // Newtype struct forwards to inner type + assert_eq!( + (NewType(7), true), + from_str::<(NewType, bool)>("7 true").unwrap() + ); + + // Unknown enum variant is an error + assert!(from_str::<(Enum,)>("unknown").is_err()); + } } diff --git a/common/taler-api/src/test/db.rs b/common/taler-api/src/test/db.rs @@ -43,7 +43,7 @@ pub async fn notification_listener( pool: PgPool, outgoing_channel: Sender<i64>, incoming_channel: Sender<i64>, -) -> sqlx::Result<()> { +) { notification_listener!(&pool, "outgoing_tx" => (row_id: i64) { outgoing_channel.send_replace(row_id); diff --git a/common/taler-common/src/api_params.rs b/common/taler-common/src/api_params.rs @@ -96,20 +96,48 @@ impl Page { } } +#[serde_as] #[derive(Debug, Clone, Deserialize)] /// <https://docs.taler.net/core/api-common.html#long-polling> -pub struct HistoryParams { - #[serde(flatten)] - pub pagination: PageParams, +pub struct PoolingParams { + #[serde_as(as = "Option<DisplayFromStr>")] #[serde(alias = "long_poll_ms")] pub timeout_ms: Option<u64>, } -impl HistoryParams { +#[derive(Debug, Default)] +pub struct Pooling { + pub timeout_ms: Option<u64>, +} + +impl PoolingParams { pub const MAX_TIMEOUT_MS: u64 = 60 * 60 * 10; // 1H + pub fn check(self) -> Result<Pooling, ParamsErr> { + Self::check_custom(self, Self::MAX_TIMEOUT_MS) + } + + pub fn check_custom(self, max_timeout_ms: u64) -> Result<Pooling, ParamsErr> { + let timeout_ms = self.timeout_ms.map(|it| it.min(max_timeout_ms)); + Ok(Pooling { timeout_ms }) + } +} + +#[derive(Debug, Clone, Deserialize)] +pub struct HistoryParams { + #[serde(flatten)] + pub pagination: PageParams, + #[serde(flatten)] + pub pooling: PoolingParams, +} + +impl HistoryParams { pub fn check(self) -> Result<History, ParamsErr> { - Self::check_custom(self, PageParams::MAX_PAGE_SIZE, Self::MAX_TIMEOUT_MS) + Self::check_custom( + self, + PageParams::MAX_PAGE_SIZE, + PoolingParams::MAX_TIMEOUT_MS, + ) } pub fn check_custom( @@ -117,10 +145,9 @@ impl HistoryParams { max_page_size: i64, max_timeout_ms: u64, ) -> Result<History, ParamsErr> { - let timeout_ms = self.timeout_ms.map(|it| it.min(max_timeout_ms)); Ok(History { page: self.pagination.check_custom(max_page_size)?, - timeout_ms, + pooling: self.pooling.check_custom(max_timeout_ms)?, }) } } @@ -128,7 +155,7 @@ impl HistoryParams { #[derive(Debug, Default)] pub struct History { pub page: Page, - pub timeout_ms: Option<u64>, + pub pooling: Pooling, } #[derive(Debug, Clone, Deserialize)] diff --git a/common/taler-test-utils/src/routine.rs b/common/taler-test-utils/src/routine.rs @@ -48,7 +48,7 @@ use crate::{ server::{TestResponse, TestServer as _}, }; -pub trait Page: DeserializeOwned { +pub trait Page: DeserializeOwned + Debug { fn ids(&self) -> Vec<i64>; } @@ -136,7 +136,7 @@ pub async fn routine_pagination<T: Page>( assert_history(&format!("limit=-10&{}", id - 4), 10).await; } -async fn assert_time<R: Debug>(range: std::ops::Range<u128>, task: impl Future<Output = R>) { +pub async fn assert_time<R: Debug>(range: std::ops::Range<u128>, task: impl Future<Output = R>) { let start = Instant::now(); task.await; let elapsed = start.elapsed().as_millis(); @@ -277,7 +277,7 @@ impl TestResponse { let params = self.query::<PageParams>().check().unwrap(); // testing the size is like expected - assert_eq!(size, page.len(), "bad page length: {page:?}"); + assert_eq!(size, page.len(), "bad page length: {page:?}\n{body:?}"); if params.limit < 0 { // testing that the first id is at most the 'offset' query param. assert!( diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -27,7 +27,7 @@ use taler_build::long_version; use taler_common::{ CommonArgs, api_common::{EddsaPublicKey, HashCode, ShortHashCode}, - api_params::{History, Page}, + api_params::{History, Page, Pooling}, api_wire::{IncomingBankTransaction, TransferState}, config::Config, taler_main, @@ -165,7 +165,7 @@ impl<'a> Harness<'a> { limit: -1, offset: None, }, - timeout_ms: None, + pooling: Pooling { timeout_ms: None }, }, &self.currency, &self.root, @@ -208,8 +208,8 @@ impl<'a> Harness<'a> { async fn transfer_id(&self, transfer_id: u64) -> i64 { sqlx::query( - "SELECT transfer_id - FROM transfer + "SELECT transfer_id + FROM transfer JOIN initiated USING (initiated_id) JOIN tx_out USING (tx_out_id) WHERE initiated_id=$1", diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs @@ -71,7 +71,7 @@ pub async fn notification_listener( taler_in_channel: Sender<i64>, out_channel: Sender<i64>, taler_out_channel: Sender<i64>, -) -> sqlx::Result<()> { +) { taler_api::notification::notification_listener!(&pool, "tx_in" => (row_id: i64) { in_channel.send_replace(row_id); diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs @@ -27,7 +27,7 @@ use taler_build::long_version; use taler_common::{ CommonArgs, api_common::{EddsaPublicKey, HashCode, ShortHashCode}, - api_params::{History, Page}, + api_params::{History, Page, Pooling}, api_wire::{IncomingBankTransaction, TransferState}, config::Config, db::{dbinit, pool}, @@ -188,7 +188,7 @@ impl<'a> Harness<'a> { limit: -1, offset: None, }, - timeout_ms: None, + pooling: Pooling { timeout_ms: None }, }, dummy_listen, ) diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -68,7 +68,7 @@ pub async fn notification_listener( taler_in_channel: Sender<i64>, out_channel: Sender<i64>, taler_out_channel: Sender<i64>, -) -> sqlx::Result<()> { +) { taler_api::notification::notification_listener!(&pool, "tx_in" => (row_id: i64) { in_channel.send_replace(row_id);