commit f63c735f81d8e85288d911db085ea26fcda28ad7
parent 39ad52a6675e9818609d453005fb79a7e842dd61
Author: Antoine A <>
Date: Sat, 16 May 2026 13:43:19 +0200
common: improve testing and pg notifications
Diffstat:
9 files changed, 341 insertions(+), 266 deletions(-)
diff --git a/common/taler-api/src/db.rs b/common/taler-api/src/db.rs
@@ -38,7 +38,7 @@ use taler_common::{
utils::date_to_utc_ts,
},
};
-use tokio::sync::watch::Receiver;
+use tokio::sync::watch::{self};
use url::Url;
pub type PgQueryBuilder<'b> = QueryBuilder<'b, Postgres>;
@@ -140,14 +140,14 @@ pub async fn page<'a, 'b, R: Send + Unpin>(
})
}
-pub async fn history<'a, 'b, R: Send + Unpin>(
+pub async fn history<'a, 'b, T: Send + Unpin>(
pool: &PgPool,
id_col: &str,
params: &History,
- listen: impl FnOnce() -> Receiver<i64>,
+ listen: impl FnOnce() -> watch::Receiver<i64>,
prepare: impl Fn() -> QueryBuilder<'a, Postgres> + Copy,
- map: impl Fn(PgRow) -> Result<R, Error> + Send + Copy,
-) -> Result<Vec<R>, Error> {
+ map: impl Fn(PgRow) -> Result<T, Error> + Send + Copy,
+) -> Result<Vec<T>, Error> {
let load = async || page(pool, ¶ms.page, id_col, prepare, map).await;
// When going backward there is always at least one transaction or none
diff --git a/common/taler-api/src/error.rs b/common/taler-api/src/error.rs
@@ -228,8 +228,8 @@ pub fn not_implemented() -> ApiError {
ApiError::new(ErrorCode::END).with_status(StatusCode::NOT_IMPLEMENTED)
}
-pub fn unauthorized() -> ApiError {
- ApiError::new(ErrorCode::GENERIC_UNAUTHORIZED)
+pub fn unauthorized(hint: impl Display) -> ApiError {
+ ApiError::new(ErrorCode::GENERIC_UNAUTHORIZED).with_hint(hint)
}
pub fn forbidden(hint: impl Display) -> ApiError {
diff --git a/common/taler-api/src/notification.rs b/common/taler-api/src/notification.rs
@@ -14,7 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{hash::Hash, sync::Arc};
+use std::hash::Hash;
use dashmap::DashMap;
use tokio::sync::watch::{self, Receiver};
@@ -53,49 +53,49 @@ macro_rules! notification_listener {
pub use notification_listener;
-type CountedNotify<T> = watch::Sender<Option<T>>;
-
#[derive(Default)]
pub struct NotificationChannel<K: Eq + Hash, V> {
- map: Arc<DashMap<K, CountedNotify<V>>>,
-}
-
-pub struct Listener<K: Eq + Hash + Clone, V> {
- map: Arc<DashMap<K, CountedNotify<V>>>,
- channel: watch::Receiver<Option<V>>,
- key: K,
+ map: DashMap<K, watch::Sender<V>>,
}
-impl<K: Eq + Hash + Clone, V> Listener<K, V> {
- pub async fn wait_for(mut self, filter: impl Fn(&V) -> bool) {
- self.channel
- .wait_for(|it| it.as_ref().map(&filter).unwrap_or(false))
- .await
- .ok(); // If the channel is closed we cannot wait efficiently
+impl<K: Eq + Hash, V> NotificationChannel<K, V> {
+ pub fn new() -> Self {
+ Self {
+ map: DashMap::new(),
+ }
}
}
-impl<K: Eq + Hash + Clone, V> Drop for Listener<K, V> {
- fn drop(&mut self) {
+impl<K: Eq + Hash + Clone, V: Default> NotificationChannel<K, V> {
+ /// Subscribe to events for a specific username.
+ /// Creates the channel lazily on first subscriber.
+ pub fn subscribe(&self, key: K) -> watch::Receiver<V> {
self.map
- .remove_if(&self.key, |_, it| it.receiver_count() == 1);
+ .entry(key)
+ .or_insert_with(|| {
+ let (sender, _) = watch::channel(V::default());
+ sender
+ })
+ .subscribe()
}
-}
-impl<K: Eq + Hash + Clone, V> NotificationChannel<K, V> {
- pub fn listener(&self, key: K) -> Listener<K, V> {
- let entry = self.map.entry(key.clone()).or_insert_with(|| {
- let (sender, _) = watch::channel(None);
- sender
- });
- Listener {
- map: self.map.clone(),
- channel: entry.subscribe(),
- key,
+ /// Dispatch a notification to the right user's channel.
+ pub fn dispatch(&self, key: &K, event: V) {
+ if let Some(tx) = self.map.get(key) {
+ tx.send(event).ok();
+ /*// send_if_modified avoids waking receivers if nothing changed
+ let _ = tx.send_if_modified(|current| {
+ *current = Some(event.clone());
+ true
+ });*/
}
}
-}
+ /// Prune channels where all receivers have been dropped.
+ pub fn prune(&self) {
+ self.map.retain(|_, tx| tx.receiver_count() > 0);
+ }
+}
pub fn dummy_listen<T: Default>() -> Receiver<T> {
tokio::sync::watch::channel(T::default()).1
}
@@ -108,32 +108,38 @@ async fn channel_gc() {
assert_eq!(0, channel.map.len());
// Clean in future
- let listener = channel.listener("test");
+ let mut listener = channel.subscribe("test");
assert_eq!(1, channel.map.len());
- tokio::time::timeout(Duration::from_millis(0), listener.wait_for(|it| it == 42))
- .await
- .unwrap_err();
+ tokio::time::timeout(Duration::from_millis(1), async move {
+ listener.wait_for(|it| it == 42).await.unwrap();
+ })
+ .await
+ .unwrap_err();
+ channel.prune();
assert_eq!(0, channel.map.len());
// Clean on drop
- let first = channel.listener("test");
- let second = channel.listener("test");
+ let mut first = channel.subscribe("test");
+ let second = channel.subscribe("test");
assert_eq!(1, channel.map.len());
- tokio::time::timeout(Duration::from_millis(0), first.wait_for(|it| it == 42))
- .await
- .unwrap_err();
+ tokio::time::timeout(Duration::from_millis(1), async move {
+ first.wait_for(|it| it == 42).await.unwrap();
+ })
+ .await
+ .unwrap_err();
assert_eq!(1, channel.map.len());
drop(second);
+ channel.prune();
assert_eq!(0, channel.map.len());
}
#[tokio::test]
async fn wake() {
let channel = NotificationChannel::default();
- let listener = channel.listener("test");
- let task = tokio::spawn(listener.wait_for(|it| *it == 42));
- channel.map.entry("test").and_modify(|it| {
- it.send(Some(42)).unwrap();
+ let mut listener = channel.subscribe("test");
+ let task = tokio::spawn(async move {
+ listener.wait_for(|it| *it == 42).await.unwrap();
});
+ channel.dispatch(&"test", 42);
task.await.unwrap();
}
diff --git a/common/taler-api/src/test.rs b/common/taler-api/src/test.rs
@@ -27,6 +27,7 @@ use taler_test_utils::{
revenue_routine, routine_pagination, transfer_routine,
},
server::TestServer,
+ tasks,
};
use tokio::sync::watch::Sender;
@@ -194,22 +195,23 @@ async fn transfer() {
#[tokio::test]
async fn outgoing_history() {
let (server, _) = setup().await;
+ let server = &server;
routine_pagination::<OutgoingHistory>(
- &server,
+ server,
"/taler-wire-gateway/history/outgoing",
- async |i| {
+ tasks!({
server
.post("/taler-wire-gateway/transfer")
.json(json!({
"request_uid": HashCode::rand(),
- "amount": amount(format!("EUR:0.0{i}")),
+ "amount": amount("EUR:1"),
"exchange_base_url": url("http://exchange.taler"),
"wtid": ShortHashCode::rand(),
"credit_account": PAYTO.clone(),
}))
.await
.assert_ok_json::<TransferResponse>();
- },
+ }),
)
.await;
}
diff --git a/common/taler-test-utils/src/db.rs b/common/taler-test-utils/src/db.rs
@@ -61,11 +61,21 @@ async fn test_db() -> PgConnectOptions {
let row: Option<(String, bool)> = sqlx::query_as(
"
SELECT
- 'taler_rust_test_' || s.id,
- EXISTS (SELECT FROM pg_database WHERE datname = 'taler_rust_test_' || s.id)
- FROM generate_series(1, 1000) AS s(id)
- WHERE pg_try_advisory_lock(s.id)
- LIMIT 1
+ v.db_name,
+ EXISTS (
+ SELECT 1
+ FROM pg_catalog.pg_database
+ WHERE datname = v.db_name
+ ) AS already_exists
+ FROM generate_series(0, 1000) AS id
+ CROSS JOIN LATERAL (
+ SELECT CASE
+ WHEN id = 0 THEN 'taler_rust_test'
+ ELSE 'taler_rust_test_' || id
+ END AS db_name
+ ) AS v
+ WHERE pg_try_advisory_lock(id)
+ LIMIT 1;
",
)
.fetch_optional(&mut conn)
diff --git a/common/taler-test-utils/src/routine.rs b/common/taler-test-utils/src/routine.rs
@@ -101,7 +101,7 @@ pub async fn latest_id<T: Page>(router: &Router, url: &str) -> i64 {
pub async fn routine_pagination<T: Page>(
server: &Router,
url: &str,
- mut register: impl AsyncFnMut(usize) -> (),
+ mut register: Tasks<impl AsyncFnMut(usize)>,
) {
// Check supported
if !server.get(url).await.is_implemented() {
@@ -119,7 +119,7 @@ pub async fn routine_pagination<T: Page>(
let latest_id = async || latest_id::<T>(server, url).await;
for i in 0..20 {
- register(i).await;
+ (register.lambda)(i).await;
}
let id = latest_id().await;
@@ -148,10 +148,8 @@ async fn assert_time<R: Debug>(range: std::ops::Range<u128>, task: impl Future<O
pub async fn routine_history<T: Page>(
server: &Router,
url: &str,
- nb_register: usize,
- mut register: impl AsyncFnMut(usize) -> (),
- nb_ignore: usize,
- mut ignore: impl AsyncFnMut(usize) -> (),
+ mut register: Tasks<impl AsyncFnMut(usize)>,
+ mut ignore: Tasks<impl AsyncFnMut(usize)>,
) {
// Check history is following specs
macro_rules! assert_history {
@@ -170,16 +168,18 @@ pub async fn routine_history<T: Page>(
// Check error when no transactions
assert_history!("limit=7".to_owned(), 0).await;
- let mut register_iter = (0..nb_register).peekable();
- let mut ignore_iter = (0..nb_ignore).peekable();
+ let mut register_iter = (0..register.len).peekable();
+ let mut ignore_iter = (0..ignore.len).peekable();
while register_iter.peek().is_some() || ignore_iter.peek().is_some() {
if let Some(idx) = register_iter.next() {
- register(idx).await
+ (register.lambda)(idx).await
}
if let Some(idx) = ignore_iter.next() {
- ignore(idx).await
+ (ignore.lambda)(idx).await
}
}
+ let nb_register = register.len;
+ let nb_ignore = ignore.len;
let nb_total = nb_register + nb_ignore;
// Check ignored
@@ -226,12 +226,12 @@ pub async fn routine_history<T: Page>(
),
async {
sleep(Duration::from_millis(100)).await;
- register(0).await
+ (register.lambda)(0).await
}
);
// Test triggers
- for i in 0..nb_register {
+ for i in 0..register.len {
let id = latest_id().await;
tokio::join!(
// Check polling succeed
@@ -241,7 +241,7 @@ pub async fn routine_history<T: Page>(
),
async {
sleep(Duration::from_millis(100)).await;
- register(i).await
+ (register.lambda)(i).await
}
);
}
@@ -256,8 +256,8 @@ pub async fn routine_history<T: Page>(
),
async {
sleep(Duration::from_millis(100)).await;
- for i in 0..nb_ignore {
- ignore(i).await
+ for i in 0..ignore.len {
+ (ignore.lambda)(i).await
}
}
);
@@ -549,19 +549,23 @@ pub async fn transfer_routine(
}
// Pagination test
- routine_pagination::<TransferList>(server, "/taler-wire-gateway/transfers", async |i| {
- server
- .post("/taler-wire-gateway/transfer")
- .json(json!({
- "request_uid": HashCode::rand(),
- "amount": amount(format!("{currency}:0.0{i}")),
- "exchange_base_url": url("http://exchange.taler"),
- "wtid": ShortHashCode::rand(),
- "credit_account": credit_account,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- })
+ routine_pagination::<TransferList>(
+ server,
+ "/taler-wire-gateway/transfers",
+ crate::tasks!({
+ server
+ .post("/taler-wire-gateway/transfer")
+ .json(json!({
+ "request_uid": HashCode::rand(),
+ "amount": amount(format!("{currency}:0.1")),
+ "exchange_base_url": url("http://exchange.taler"),
+ "wtid": ShortHashCode::rand(),
+ "credit_account": credit_account,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ }),
+ )
.await;
}
}
@@ -671,39 +675,81 @@ async fn add_incoming_routine(
.assert_error(ErrorCode::GENERIC_JSON_INVALID);
}
+pub struct Tasks<F: AsyncFnMut(usize)> {
+ pub len: usize,
+ pub lambda: F,
+}
+
+#[macro_export]
+macro_rules! tasks {
+ ( $( $(if $cond:expr =>)? $body:block ),* $(,)? ) => {{
+ // Evaluate every condition exactly once, up front
+ let conditions = [ $( true $( && ($cond) )? ),* ];
+
+ // Compact length = number of true conditions
+ let active_len = conditions.iter().filter(|&&c| c).count();
+
+ $crate::routine::Tasks {
+ len: active_len,
+ lambda: async move |i: usize| {
+ if active_len == 0 {
+ return
+ }
+ let i = i % active_len;
+ let mut cond_idx = 0usize;
+ let mut current_idx = 0usize;
+
+ $(
+ // If this specific block's condition passed...
+ if conditions[cond_idx] {
+ // ...and it matches the requested execution index
+ if i == current_idx {
+ (async $body).await;
+ return;
+ }
+ current_idx += 1;
+ }
+ cond_idx += 1;
+ )*
+
+ let _ = (&mut current_idx, &mut cond_idx); // suppress lints
+ unreachable!("Index {i} out of bounds for {active_len} active tasks");
+ }
+ }
+ }};
+}
+
/// Test standard behavior of the revenue endpoints
pub async fn revenue_routine(server: &Router, debit_acount: &PaytoURI, kyc: bool) {
let currency = &get_currency(server).await;
-
routine_history::<RevenueIncomingHistory>(
server,
"/taler-revenue/history",
- 2,
- async |i| {
- if i % 2 == 0 || !kyc {
- server
- .post("/taler-wire-gateway/admin/add-incoming")
- .json(json!({
- "amount": format!("{currency}:0.0{i}"),
- "reserve_pub": EddsaPublicKey::rand(),
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- } else {
- server
- .post("/taler-wire-gateway/admin/add-kycauth")
- .json(json!({
- "amount": format!("{currency}:0.0{i}"),
- "account_pub": EddsaPublicKey::rand(),
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
+ tasks!(
+ {
+ server
+ .post("/taler-wire-gateway/admin/add-incoming")
+ .json(json!({
+ "amount": format!("{currency}:1"),
+ "reserve_pub": EddsaPublicKey::rand(),
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ },
+ if kyc => {
+ server
+ .post("/taler-wire-gateway/admin/add-kycauth")
+ .json(json!({
+ "amount": format!("{currency}:2"),
+ "account_pub": EddsaPublicKey::rand(),
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
}
- },
- 0,
- async |_| {},
+ ),
+ tasks!(),
)
.await;
}
@@ -713,147 +759,145 @@ pub async fn in_history_routine(server: &Router, debit_acount: &PaytoURI, kyc: b
let currency = &get_currency(server).await;
// History
// TODO check non taler some are ignored
- let mut key_pair = Ed25519KeyPair::generate().unwrap();
- let len = if kyc { 6 } else { 3 };
+ let mut key = Ed25519KeyPair::generate().unwrap();
+
+ let tasks = tasks!(
+ {
+ server
+ .post("/taler-wire-gateway/admin/add-incoming")
+ .json(json!({
+ "amount": format!("{currency}:1"),
+ "reserve_pub": EddsaPublicKey::rand(),
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ },
+ {
+ key = Ed25519KeyPair::generate().unwrap();
+ let auth_pub = EddsaPublicKey::try_from(key.public_key().as_ref()).unwrap();
+ let reserve_pub = EddsaPublicKey::rand();
+ let amount = format!("{currency}:2");
+ server
+ .post("/taler-prepared-transfer/registration")
+ .json(json!({
+ "credit_amount": amount,
+ "type": "reserve",
+ "alg": "EdDSA",
+ "account_pub": reserve_pub,
+ "authorization_pub": auth_pub,
+ "authorization_sig": eddsa_sign(&key, reserve_pub.as_ref()),
+ "recurrent": true
+ }))
+ .await
+ .assert_ok_json::<RegistrationResponse>();
+ server
+ .post("/taler-wire-gateway/admin/add-mapped")
+ .json(json!({
+ "amount": amount,
+ "authorization_pub": auth_pub,
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ server
+ .post("/taler-wire-gateway/admin/add-mapped")
+ .json(json!({
+ "amount": amount,
+ "authorization_pub": auth_pub,
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ },
+ {
+ let auth_pub = EddsaPublicKey::try_from(key.public_key().as_ref()).unwrap();
+ let reserve_pub = EddsaPublicKey::rand();
+ server
+ .post("/taler-prepared-transfer/registration")
+ .json(json!({
+ "credit_amount": format!("{currency}:3"),
+ "type": "reserve",
+ "alg": "EdDSA",
+ "account_pub": reserve_pub,
+ "authorization_pub": auth_pub,
+ "authorization_sig": eddsa_sign(&key, reserve_pub.as_ref()),
+ "recurrent": true
+ }))
+ .await
+ .assert_ok_json::<RegistrationResponse>();
+ },
+ if kyc => {
+ server
+ .post("/taler-wire-gateway/admin/add-kycauth")
+ .json(json!({
+ "amount": format!("{currency}:4"),
+ "account_pub": EddsaPublicKey::rand(),
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ },
+ if kyc => {
+ key = Ed25519KeyPair::generate().unwrap();
+ let auth_pub = EddsaPublicKey::try_from(key.public_key().as_ref()).unwrap();
+ let account_pub = EddsaPublicKey::rand();
+ let amount = format!("{currency}:5");
+ server
+ .post("/taler-prepared-transfer/registration")
+ .json(json!({
+ "credit_amount": amount,
+ "type": "kyc",
+ "alg": "EdDSA",
+ "account_pub": account_pub,
+ "authorization_pub": auth_pub,
+ "authorization_sig": eddsa_sign(&key, account_pub.as_ref()),
+ "recurrent": true
+ }))
+ .await
+ .assert_ok_json::<RegistrationResponse>();
+ server
+ .post("/taler-wire-gateway/admin/add-mapped")
+ .json(json!({
+ "amount": amount,
+ "authorization_pub": auth_pub,
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ server
+ .post("/taler-wire-gateway/admin/add-mapped")
+ .json(json!({
+ "amount": amount,
+ "authorization_pub": auth_pub,
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ },
+ if kyc => {
+ let auth_pub = EddsaPublicKey::try_from(key.public_key().as_ref()).unwrap();
+ let account_pub = EddsaPublicKey::rand();
+ server
+ .post("/taler-prepared-transfer/registration")
+ .json(json!({
+ "credit_amount": format!("{currency}:6"),
+ "type": "kyc",
+ "alg": "EdDSA",
+ "account_pub": account_pub,
+ "authorization_pub": auth_pub,
+ "authorization_sig": eddsa_sign(&key, account_pub.as_ref()),
+ "recurrent": true
+ }))
+ .await
+ .assert_ok_json::<RegistrationResponse>();
+ }
+ );
routine_history::<IncomingHistory>(
server,
"/taler-wire-gateway/history/incoming",
- len,
- async |i| match i % len {
- 0 => {
- server
- .post("/taler-wire-gateway/admin/add-incoming")
- .json(json!({
- "amount": format!("{currency}:0.0{i}"),
- "reserve_pub": EddsaPublicKey::rand(),
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- }
- 1 => {
- key_pair = Ed25519KeyPair::generate().unwrap();
- let auth_pub = EddsaPublicKey::try_from(key_pair.public_key().as_ref()).unwrap();
- let reserve_pub = EddsaPublicKey::rand();
- let amount = format!("{currency}:0.0{i}");
- server
- .post("/taler-prepared-transfer/registration")
- .json(json!({
- "credit_amount": amount,
- "type": "reserve",
- "alg": "EdDSA",
- "account_pub": reserve_pub,
- "authorization_pub": auth_pub,
- "authorization_sig": eddsa_sign(&key_pair, reserve_pub.as_ref()),
- "recurrent": true
- }))
- .await
- .assert_ok_json::<RegistrationResponse>();
- server
- .post("/taler-wire-gateway/admin/add-mapped")
- .json(json!({
- "amount": amount,
- "authorization_pub": auth_pub,
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- server
- .post("/taler-wire-gateway/admin/add-mapped")
- .json(json!({
- "amount": amount,
- "authorization_pub": auth_pub,
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- }
- 2 => {
- let auth_pub = EddsaPublicKey::try_from(key_pair.public_key().as_ref()).unwrap();
- let reserve_pub = EddsaPublicKey::rand();
- server
- .post("/taler-prepared-transfer/registration")
- .json(json!({
- "credit_amount": format!("{currency}:0.0{i}"),
- "type": "reserve",
- "alg": "EdDSA",
- "account_pub": reserve_pub,
- "authorization_pub": auth_pub,
- "authorization_sig": eddsa_sign(&key_pair, reserve_pub.as_ref()),
- "recurrent": true
- }))
- .await
- .assert_ok_json::<RegistrationResponse>();
- }
- 3 => {
- server
- .post("/taler-wire-gateway/admin/add-kycauth")
- .json(json!({
- "amount": format!("{currency}:0.0{i}"),
- "account_pub": EddsaPublicKey::rand(),
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- }
- 4 => {
- key_pair = Ed25519KeyPair::generate().unwrap();
- let auth_pub = EddsaPublicKey::try_from(key_pair.public_key().as_ref()).unwrap();
- let account_pub = EddsaPublicKey::rand();
- let amount = format!("{currency}:0.0{i}");
- server
- .post("/taler-prepared-transfer/registration")
- .json(json!({
- "credit_amount": amount,
- "type": "kyc",
- "alg": "EdDSA",
- "account_pub": account_pub,
- "authorization_pub": auth_pub,
- "authorization_sig": eddsa_sign(&key_pair, account_pub.as_ref()),
- "recurrent": true
- }))
- .await
- .assert_ok_json::<RegistrationResponse>();
- server
- .post("/taler-wire-gateway/admin/add-mapped")
- .json(json!({
- "amount": amount,
- "authorization_pub": auth_pub,
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- server
- .post("/taler-wire-gateway/admin/add-mapped")
- .json(json!({
- "amount": amount,
- "authorization_pub": auth_pub,
- "debit_account": debit_acount,
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- }
- 5 => {
- let auth_pub = EddsaPublicKey::try_from(key_pair.public_key().as_ref()).unwrap();
- let account_pub = EddsaPublicKey::rand();
- server
- .post("/taler-prepared-transfer/registration")
- .json(json!({
- "credit_amount": format!("{currency}:0.0{i}"),
- "type": "kyc",
- "alg": "EdDSA",
- "account_pub": account_pub,
- "authorization_pub": auth_pub,
- "authorization_sig": eddsa_sign(&key_pair, account_pub.as_ref()),
- "recurrent": true
- }))
- .await
- .assert_ok_json::<RegistrationResponse>();
- }
- nb => unreachable!("unexpected state {nb}"),
- },
- 0,
- async |_| {},
+ tasks,
+ tasks!(),
)
.await;
}
diff --git a/common/taler-test-utils/src/server.rs b/common/taler-test-utils/src/server.rs
@@ -269,6 +269,11 @@ impl TestResponse {
}
#[track_caller]
+ pub fn assert_not_implemented(&self) {
+ self.assert_status(StatusCode::NOT_IMPLEMENTED);
+ }
+
+ #[track_caller]
pub fn assert_error(&self, error_code: ErrorCode) {
self.assert_error_status(
error_code,
diff --git a/taler-cyclos/src/api.rs b/taler-cyclos/src/api.rs
@@ -330,7 +330,10 @@ impl PreparedTransfer for CyclosApi {
#[cfg(test)]
mod test {
- use std::sync::{Arc, LazyLock};
+ use std::sync::{
+ Arc, LazyLock,
+ atomic::{AtomicI64, Ordering},
+ };
use compact_str::CompactString;
use jiff::Timestamp;
@@ -356,6 +359,7 @@ mod test {
revenue_routine, routine_pagination, transfer_routine,
},
server::TestServer as _,
+ tasks,
};
use crate::{
@@ -410,19 +414,18 @@ mod test {
#[tokio::test]
async fn outgoing_history() {
let (server, pool) = setup().await;
+ static CODE: AtomicI64 = AtomicI64::new(0);
+
routine_pagination::<OutgoingHistory>(
&server,
"/taler-wire-gateway/history/outgoing",
- async |i| {
+ tasks!({
+ let i = CODE.fetch_add(1, Ordering::Relaxed);
db::register_tx_out(
&mut pool.acquire().await.unwrap(),
&db::TxOut {
- transfer_id: i as i64,
- tx_id: if i % 2 == 0 {
- Some((i % 2) as i64)
- } else {
- None
- },
+ transfer_id: i,
+ tx_id: if i % 2 == 0 { Some(i % 2) } else { None },
amount: decimal("10"),
subject: "subject".to_owned(),
creditor_id: 31000163100000000,
@@ -434,7 +437,7 @@ mod test {
)
.await
.unwrap();
- },
+ }),
)
.await;
}
@@ -460,7 +463,7 @@ mod test {
async fn check_in(pool: &PgPool) -> Vec<Status> {
sqlx::query(
"
- SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata
+ SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata
FROM tx_in
LEFT JOIN taler_in USING (tx_in_id)
LEFT JOIN pending_recurrent_in USING (tx_in_id)
diff --git a/taler-magnet-bank/src/api.rs b/taler-magnet-bank/src/api.rs
@@ -303,7 +303,10 @@ impl PreparedTransfer for MagnetApi {
#[cfg(test)]
mod test {
- use std::sync::{Arc, LazyLock};
+ use std::sync::{
+ Arc, LazyLock,
+ atomic::{AtomicU64, Ordering},
+ };
use jiff::{Timestamp, Zoned};
use sqlx::{PgPool, Row as _, postgres::PgRow};
@@ -328,6 +331,7 @@ mod test {
revenue_routine, routine_pagination, transfer_routine,
},
server::TestServer,
+ tasks,
};
use crate::{
@@ -387,16 +391,17 @@ mod test {
#[tokio::test]
async fn outgoing_history() {
let (server, pool) = setup().await;
+ static CODE: AtomicU64 = AtomicU64::new(0);
routine_pagination::<OutgoingHistory>(
&server,
"/taler-wire-gateway/history/outgoing",
- async |i| {
+ tasks!({
let mut conn = pool.acquire().await.unwrap();
let now = Zoned::now().date();
db::register_tx_out(
&mut conn,
&db::TxOut {
- code: i as u64,
+ code: CODE.fetch_add(1, Ordering::Relaxed),
amount: amount("EUR:10"),
subject: "subject".into(),
creditor: PAYTO.clone(),
@@ -408,7 +413,7 @@ mod test {
)
.await
.unwrap();
- },
+ }),
)
.await;
}