taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 0cddc03a19bff84b94b885ee4f4ed29c7074e7f0
parent 95dd8e8fda8020e68023df935207272eefbf1c26
Author: Antoine A <>
Date:   Sat, 21 Mar 2026 14:09:14 +0100

common: retry on postgres serialization error

Diffstat:
Mcommon/taler-api/src/db.rs | 82+++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------------
Mcommon/taler-api/tests/common/db.rs | 127++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Mcommon/taler-api/tests/common/mod.rs | 2+-
Mtaler-cyclos/src/db.rs | 610+++++++++++++++++++++++++++++++++++++++----------------------------------------
Mtaler-magnet-bank/src/db.rs | 687+++++++++++++++++++++++++++++++++++++++----------------------------------------
5 files changed, 766 insertions(+), 742 deletions(-)

diff --git a/common/taler-api/src/db.rs b/common/taler-api/src/db.rs @@ -22,8 +22,7 @@ use jiff::{ tz::TimeZone, }; use sqlx::{ - Decode, Error, PgExecutor, PgPool, QueryBuilder, Type, error::BoxDynError, postgres::PgRow, - query::Query, + Decode, Error, PgPool, QueryBuilder, Type, error::BoxDynError, postgres::PgRow, query::Query, }; use sqlx::{Postgres, Row}; use taler_common::{ @@ -50,36 +49,69 @@ pub enum IncomingType { wad, } +/* ------ Serialization ----- */ + +#[macro_export] +macro_rules! serialized { + ($logic:expr) => {{ + let mut attempts = 0; + const MAX_RETRIES: u32 = 5; + /// PostgreSQL serialization failure error code (40001) + const PG_SERIALIZATION_FAILURE: &str = "40001"; + /// PostgreSQL deadlock detected error code (40P01) + const PG_DEADLOCK_DETECTED: &str = "40P01"; + + loop { + match $logic.await { + Ok(res) => break Ok(res), + Err(sqlx::Error::Database(e)) + if matches!( + e.downcast_ref::<sqlx::postgres::PgDatabaseError>().code(), + PG_SERIALIZATION_FAILURE | PG_DEADLOCK_DETECTED + ) && attempts < MAX_RETRIES => + { + attempts += 1; + tokio::task::yield_now().await; + continue; + } + Err(e) => break Err(e), + } + } + }}; +} + /* ----- Routines ------ */ pub async fn page<'a, 'b, R: Send + Unpin>( - pool: impl PgExecutor<'b>, + db: &PgPool, id_col: &str, params: &Page, - prepare: impl Fn() -> QueryBuilder<'a, Postgres>, - map: impl Fn(PgRow) -> Result<R, Error> + Send, + prepare: impl Fn() -> QueryBuilder<'a, Postgres> + Copy, + map: impl Fn(PgRow) -> Result<R, Error> + Send + Copy, ) -> Result<Vec<R>, Error> { - let mut builder = prepare(); - if let Some(offset) = params.offset { + serialized!(async { + let mut builder = prepare(); + if let Some(offset) = params.offset { + builder + .push(format_args!( + " {id_col} {}", + if params.backward() { '<' } else { '>' } + )) + .push_bind(offset); + } else { + builder.push("TRUE"); + } + builder.push(format_args!( + " ORDER BY {id_col} {} LIMIT ", + if params.backward() { "DESC" } else { "ASC" } + )); builder - .push(format_args!( - " {id_col} {}", - if params.backward() { '<' } else { '>' } - )) - .push_bind(offset); - } else { - builder.push("TRUE"); - } - builder.push(format_args!( - " ORDER BY {id_col} {} LIMIT ", - if params.backward() { "DESC" } else { "ASC" } - )); - builder - .push_bind(params.limit.abs()) - .build() - .try_map(map) - .fetch_all(pool) - .await + .push_bind(params.limit.abs()) + .build() + .try_map(map) + .fetch_all(db) + .await + }) } pub async fn history<'a, 'b, R: Send + Unpin>( diff --git a/common/taler-api/tests/common/db.rs b/common/taler-api/tests/common/db.rs @@ -16,7 +16,10 @@ use jiff::Timestamp; use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow}; -use taler_api::db::{BindHelper, IncomingType, TypeHelper, history, page}; +use taler_api::{ + db::{BindHelper, IncomingType, TypeHelper, history, page}, + serialized, +}; use taler_common::{ api_common::{EddsaPublicKey, SafeU64}, api_params::{History, Page}, @@ -53,34 +56,36 @@ pub enum TransferResult { WtidReuse, } -pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> sqlx::Result<TransferResult> { - sqlx::query( - " +pub async fn transfer(db: &PgPool, transfer: &TransferRequest) -> sqlx::Result<TransferResult> { + let subject = &format!("{} {}", transfer.wtid, transfer.exchange_base_url); + serialized!( + sqlx::query( + " SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at FROM taler_transfer($1, $2, $3, $4, $5, $6, $7) ", - ) - .bind(&transfer.amount) - .bind(transfer.exchange_base_url.as_str()) - .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url)) - .bind(transfer.credit_account.raw()) - .bind(transfer.request_uid) - .bind(transfer.wtid) - .bind_timestamp(&Timestamp::now()) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag("out_request_uid_reuse")? { - TransferResult::RequestUidReuse - } else if r.try_get_flag("out_wtid_reuse")? { - TransferResult::WtidReuse - } else { - TransferResult::Success(TransferResponse { - row_id: r.try_get_safeu64("out_transfer_row_id")?, - timestamp: r.try_get_timestamp("out_created_at")?.into(), + ) + .bind(&transfer.amount) + .bind(transfer.exchange_base_url.as_str()) + .bind(subject) + .bind(transfer.credit_account.raw()) + .bind(&transfer.request_uid) + .bind(&transfer.wtid) + .bind_timestamp(&Timestamp::now()) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag("out_request_uid_reuse")? { + TransferResult::RequestUidReuse + } else if r.try_get_flag("out_wtid_reuse")? { + TransferResult::WtidReuse + } else { + TransferResult::Success(TransferResponse { + row_id: r.try_get_safeu64("out_transfer_row_id")?, + timestamp: r.try_get_timestamp("out_created_at")?.into(), + }) }) }) - }) - .fetch_one(db) - .await + .fetch_one(db) + ) } pub async fn transfer_page( @@ -128,8 +133,9 @@ pub async fn transfer_by_id( id: u64, currency: &Currency, ) -> sqlx::Result<Option<TransferStatus>> { - sqlx::query( - " + serialized!( + sqlx::query( + " SELECT status, status_msg, @@ -140,21 +146,21 @@ pub async fn transfer_by_id( created_at FROM transfer WHERE transfer_id = $1 ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - status: r.try_get("status")?, - status_msg: r.try_get("status_msg")?, - amount: r.try_get_amount("amount", currency)?, - origin_exchange_url: r.try_get("exchange_base_url")?, - wtid: r.try_get("wtid")?, - credit_account: r.try_get_payto("credit_payto")?, - timestamp: r.try_get_timestamp("created_at")?.into(), + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + status: r.try_get("status")?, + status_msg: r.try_get("status_msg")?, + amount: r.try_get_amount("amount", currency)?, + origin_exchange_url: r.try_get("exchange_base_url")?, + wtid: r.try_get("wtid")?, + credit_account: r.try_get_payto("credit_payto")?, + timestamp: r.try_get_timestamp("created_at")?.into(), + }) }) - }) - .fetch_optional(db) - .await + .fetch_optional(db) + ) } pub async fn outgoing_revenue( @@ -211,30 +217,31 @@ pub async fn add_incoming( kind: IncomingType, key: &EddsaPublicKey, ) -> sqlx::Result<AddIncomingResult> { - sqlx::query( - " + serialized!( + sqlx::query( + " SELECT out_reserve_pub_reuse, out_tx_row_id, out_created_at FROM add_incoming($1, $2, $3, $4, $5, $6) ", - ) - .bind(amount) - .bind(subject) - .bind(debit_account.raw()) - .bind(kind) - .bind(key) - .bind_timestamp(timestamp) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag("out_reserve_pub_reuse")? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success { - id: r.try_get_safeu64("out_tx_row_id")?, - created_at: r.try_get_timestamp("out_created_at")?, - } + ) + .bind(amount) + .bind(subject) + .bind(debit_account.raw()) + .bind(kind) + .bind(key) + .bind_timestamp(timestamp) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag("out_reserve_pub_reuse")? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + id: r.try_get_safeu64("out_tx_row_id")?, + created_at: r.try_get_timestamp("out_created_at")?, + } + }) }) - }) - .fetch_one(db) - .await + .fetch_one(db) + ) } pub async fn incoming_history( diff --git a/common/taler-api/tests/common/mod.rs b/common/taler-api/tests/common/mod.rs @@ -61,7 +61,7 @@ impl TalerApi for TestApi { impl WireGateway for TestApi { async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { - let result = db::transfer(&self.pool, req).await?; + let result = db::transfer(&self.pool, &req).await?; match result { db::TransferResult::Success(transfer_response) => Ok(transfer_response), db::TransferResult::RequestUidReuse => Err(failure( diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs @@ -19,9 +19,10 @@ use std::fmt::Display; use compact_str::CompactString; use jiff::Timestamp; use serde::{Serialize, de::DeserializeOwned}; -use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; +use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; use taler_api::{ db::{BindHelper, IncomingType, TypeHelper, history, page}, + serialized, subject::{IncomingSubject, OutgoingSubject}, }; use taler_common::{ @@ -208,32 +209,33 @@ pub async fn register_tx_in_admin( tx: &TxInAdmin, now: &Timestamp, ) -> sqlx::Result<AddIncomingResult> { - sqlx::query( - " + serialized!( + sqlx::query( + " SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) ", - ) - .bind(tx.amount) - .bind(&tx.subject) - .bind(tx.debtor_id) - .bind(&tx.debtor_name) - .bind_timestamp(now) - .bind(tx.metadata.ty()) - .bind(tx.metadata.key()) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag(0)? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_timestamp(2)?, - new: r.try_get(3)?, - } + ) + .bind(tx.amount) + .bind(&tx.subject) + .bind(tx.debtor_id) + .bind(&tx.debtor_name) + .bind_timestamp(now) + .bind(tx.metadata.ty()) + .bind(tx.metadata.key()) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_u64(1)?, + valued_at: r.try_get_timestamp(2)?, + new: r.try_get(3)?, + } + }) }) - }) - .fetch_one(db) - .await + .fetch_one(db) + ) } pub async fn register_tx_in( @@ -242,35 +244,36 @@ pub async fn register_tx_in( subject: &Option<IncomingSubject>, now: &Timestamp, ) -> sqlx::Result<AddIncomingResult> { - sqlx::query( - " + serialized!( + sqlx::query( + " SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ", - ) - .bind(tx.transfer_id) - .bind(tx.tx_id) - .bind(tx.amount) - .bind(&tx.subject) - .bind(tx.debtor_id) - .bind(&tx.debtor_name) - .bind(tx.valued_at.as_microsecond()) - .bind(subject.as_ref().map(|it| it.ty())) - .bind(subject.as_ref().map(|it| it.key())) - .bind(now.as_microsecond()) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag(0)? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_timestamp(2)?, - new: r.try_get(3)?, - } + ) + .bind(tx.transfer_id) + .bind(tx.tx_id) + .bind(tx.amount) + .bind(&tx.subject) + .bind(tx.debtor_id) + .bind(&tx.debtor_name) + .bind(tx.valued_at.as_microsecond()) + .bind(subject.as_ref().map(|it| it.ty())) + .bind(subject.as_ref().map(|it| it.key())) + .bind(now.as_microsecond()) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_u64(1)?, + valued_at: r.try_get_timestamp(2)?, + new: r.try_get(3)?, + } + }) }) - }) - .fetch_one(db) - .await + .fetch_one(&mut *db) + ) } #[derive(Debug)] @@ -304,40 +307,43 @@ pub async fn register_tx_out( kind: &TxOutKind, now: &Timestamp, ) -> sqlx::Result<AddOutgoingResult> { - let query = sqlx::query( - " - SELECT out_result, out_tx_row_id - FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) - ", - ) - .bind(tx.transfer_id) - .bind(tx.tx_id) - .bind(tx.amount) - .bind(&tx.subject) - .bind(tx.creditor_id) - .bind(&tx.creditor_name) - .bind_timestamp(&tx.valued_at); - let query = match kind { - TxOutKind::Simple => query - .bind(None::<&[u8]>) - .bind(None::<&str>) - .bind(None::<i64>), - TxOutKind::Bounce(bounced) => query.bind(None::<&[u8]>).bind(None::<&str>).bind(*bounced), - TxOutKind::Talerable(subject) => query - .bind(&subject.wtid) - .bind(subject.exchange_base_url.as_str()) - .bind(None::<i64>), - }; - query - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(AddOutgoingResult { - result: r.try_get(0)?, - row_id: r.try_get(1)?, + serialized!({ + let query = sqlx::query( + " + SELECT out_result, out_tx_row_id + FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ", + ) + .bind(tx.transfer_id) + .bind(tx.tx_id) + .bind(tx.amount) + .bind(&tx.subject) + .bind(tx.creditor_id) + .bind(&tx.creditor_name) + .bind_timestamp(&tx.valued_at); + let query = match kind { + TxOutKind::Simple => query + .bind(None::<&[u8]>) + .bind(None::<&str>) + .bind(None::<i64>), + TxOutKind::Bounce(bounced) => { + query.bind(None::<&[u8]>).bind(None::<&str>).bind(*bounced) + } + TxOutKind::Talerable(subject) => query + .bind(&subject.wtid) + .bind(subject.exchange_base_url.as_str()) + .bind(None::<i64>), + }; + query + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(AddOutgoingResult { + result: r.try_get(0)?, + row_id: r.try_get(1)?, + }) }) - }) - .fetch_one(db) - .await + .fetch_one(&mut *db) + }) } #[derive(Debug, PartialEq, Eq)] @@ -357,40 +363,41 @@ pub struct Transfer { pub creditor_name: CompactString, } -pub async fn make_transfer<'a>( - db: impl PgExecutor<'a>, +pub async fn make_transfer( + db: &PgPool, tx: &Transfer, now: &Timestamp, ) -> sqlx::Result<TransferResult> { let subject = format!("{} {}", tx.wtid, tx.exchange_base_url); - sqlx::query( - " - SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at - FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) - ", - ) - .bind(&tx.request_uid) - .bind(&tx.wtid) - .bind(&subject) - .bind(tx.amount) - .bind(tx.exchange_base_url.as_str()) - .bind(tx.creditor_id) - .bind(&tx.creditor_name) - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag(0)? { - TransferResult::RequestUidReuse - } else if r.try_get_flag(1)? { - TransferResult::WtidReuse - } else { - TransferResult::Success { - id: r.try_get_u64(2)?, - initiated_at: r.try_get_timestamp(3)?, - } + serialized!( + sqlx::query( + " + SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at + FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) + ", + ) + .bind(&tx.request_uid) + .bind(&tx.wtid) + .bind(&subject) + .bind(tx.amount) + .bind(tx.exchange_base_url.as_str()) + .bind(tx.creditor_id) + .bind(&tx.creditor_name) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag(0)? { + TransferResult::RequestUidReuse + } else if r.try_get_flag(1)? { + TransferResult::WtidReuse + } else { + TransferResult::Success { + id: r.try_get_u64(2)?, + initiated_at: r.try_get_timestamp(3)?, + } + }) }) - }) - .fetch_one(db) - .await + .fetch_one(db) + ) } #[derive(Debug, PartialEq, Eq)] @@ -406,34 +413,35 @@ pub async fn register_bounced_tx_in( reason: &str, now: &Timestamp, ) -> sqlx::Result<BounceResult> { - sqlx::query( - " - SELECT out_tx_row_id, out_tx_new - FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ", - ) - .bind(tx.transfer_id) - .bind(tx.tx_id) - .bind(tx.amount) - .bind(&tx.subject) - .bind(tx.debtor_id) - .bind(&tx.debtor_name) - .bind_timestamp(&tx.valued_at) - .bind(chargeback_id) - .bind(reason) - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(BounceResult { - tx_id: r.try_get_u64(0)?, - tx_new: r.try_get(1)?, + serialized!( + sqlx::query( + " + SELECT out_tx_row_id, out_tx_new + FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ", + ) + .bind(tx.transfer_id) + .bind(tx.tx_id) + .bind(tx.amount) + .bind(&tx.subject) + .bind(tx.debtor_id) + .bind(&tx.debtor_name) + .bind_timestamp(&tx.valued_at) + .bind(chargeback_id) + .bind(reason) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(BounceResult { + tx_id: r.try_get_u64(0)?, + tx_new: r.try_get(1)?, + }) }) - }) - .fetch_one(db) - .await + .fetch_one(&mut *db) + ) } -pub async fn transfer_page<'a>( - db: impl PgExecutor<'a>, +pub async fn transfer_page( + db: &PgPool, status: &Option<TransferState>, currency: &Currency, root: &CompactString, @@ -621,51 +629,53 @@ pub async fn revenue_history( .await } -pub async fn transfer_by_id<'a>( - db: impl PgExecutor<'a>, +pub async fn transfer_by_id( + db: &PgPool, id: u64, currency: &Currency, root: &CompactString, ) -> sqlx::Result<Option<TransferStatus>> { - sqlx::query( - " - SELECT - status, - status_msg, - amount, - exchange_base_url, - wtid, - credit_account, - credit_name, - initiated_at - FROM transfer - JOIN initiated USING (initiated_id) - WHERE initiated_id = $1 - ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - status: r.try_get(0)?, - status_msg: r.try_get(1)?, - amount: r.try_get_amount(2, currency)?, - origin_exchange_url: r.try_get(3)?, - wtid: r.try_get(4)?, - credit_account: r.try_get_cyclos_fullpaytouri(5, 6, root)?, - timestamp: r.try_get_timestamp(7)?.into(), + serialized!( + sqlx::query( + " + SELECT + status, + status_msg, + amount, + exchange_base_url, + wtid, + credit_account, + credit_name, + initiated_at + FROM transfer + JOIN initiated USING (initiated_id) + WHERE initiated_id = $1 + ", + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + status: r.try_get(0)?, + status_msg: r.try_get(1)?, + amount: r.try_get_amount(2, currency)?, + origin_exchange_url: r.try_get(3)?, + wtid: r.try_get(4)?, + credit_account: r.try_get_cyclos_fullpaytouri(5, 6, root)?, + timestamp: r.try_get_timestamp(7)?.into(), + }) }) - }) - .fetch_optional(db) - .await + .fetch_optional(db) + ) } /** Get a batch of pending initiated transactions not attempted since [start] */ -pub async fn pending_batch<'a>( - db: impl PgExecutor<'a>, +pub async fn pending_batch( + db: &mut PgConnection, start: &Timestamp, ) -> sqlx::Result<Vec<Initiated>> { - sqlx::query( - " + serialized!( + sqlx::query( + " SELECT initiated_id, amount, subject, credit_account, credit_name FROM initiated WHERE tx_id IS NULL @@ -673,58 +683,62 @@ pub async fn pending_batch<'a>( AND (last_submitted IS NULL OR last_submitted < $1) LIMIT 100 ", - ) - .bind_timestamp(start) - .try_map(|r: PgRow| { - Ok(Initiated { - id: r.try_get(0)?, - amount: r.try_get(1)?, - subject: r.try_get(2)?, - creditor_id: r.try_get(3)?, - creditor_name: r.try_get(4)?, + ) + .bind_timestamp(start) + .try_map(|r: PgRow| { + Ok(Initiated { + id: r.try_get(0)?, + amount: r.try_get(1)?, + subject: r.try_get(2)?, + creditor_id: r.try_get(3)?, + creditor_name: r.try_get(4)?, + }) }) - }) - .fetch_all(db) - .await + .fetch_all(&mut *db) + ) } /** Update status of a successful submitted initiated transaction */ -pub async fn initiated_submit_success<'a>( - db: impl PgExecutor<'a>, +pub async fn initiated_submit_success( + db: &mut PgConnection, initiated_id: i64, timestamp: &Timestamp, tx_id: i64, ) -> sqlx::Result<()> { - sqlx::query( - " - UPDATE initiated - SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 - WHERE initiated_id=$3 - " - ).bind_timestamp(timestamp) - .bind(tx_id) - .bind(initiated_id) - .execute(db).await?; + serialized!( + sqlx::query( + " + UPDATE initiated + SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 + WHERE initiated_id=$3 + " + ) + .bind_timestamp(timestamp) + .bind(tx_id) + .bind(initiated_id) + .execute(&mut *db) + )?; Ok(()) } /** Update status of a permanently failed initiated transaction */ -pub async fn initiated_submit_permanent_failure<'a>( - db: impl PgExecutor<'a>, +pub async fn initiated_submit_permanent_failure( + db: &mut PgConnection, initiated_id: i64, msg: &str, ) -> sqlx::Result<()> { - sqlx::query( - " - UPDATE initiated - SET status='permanent_failure', status_msg=$1 - WHERE initiated_id=$2 - ", - ) - .bind(msg) - .bind(initiated_id) - .execute(db) - .await?; + serialized!( + sqlx::query( + " + UPDATE initiated + SET status='permanent_failure', status_msg=$1 + WHERE initiated_id=$2 + ", + ) + .bind(msg) + .bind(initiated_id) + .execute(&mut *db) + )?; Ok(()) } @@ -740,7 +754,7 @@ pub async fn initiated_chargeback_failure( db: &mut PgConnection, transfer_id: i64, ) -> sqlx::Result<ChargebackFailureResult> { - Ok( + Ok(serialized!( sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") .bind(transfer_id) .try_map(|r: PgRow| { @@ -753,35 +767,32 @@ pub async fn initiated_chargeback_failure( ChargebackFailureResult::Idempotent(id) }) }) - .fetch_optional(db) - .await? - .unwrap_or(ChargebackFailureResult::Unknown), - ) + .fetch_optional(&mut *db) + )? + .unwrap_or(ChargebackFailureResult::Unknown)) } /** Get JSON value from KV table */ -pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>( - db: impl PgExecutor<'a>, +pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( + db: &mut PgConnection, key: &str, ) -> sqlx::Result<Option<T>> { - sqlx::query("SELECT value FROM kv WHERE key=$1") - .bind(key) - .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) - .fetch_optional(db) - .await + serialized!( + sqlx::query("SELECT value FROM kv WHERE key=$1") + .bind(key) + .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) + .fetch_optional(&mut *db) + ) } /** Set JSON value in KV table */ -pub async fn kv_set<'a, T: Serialize>( - db: impl PgExecutor<'a>, - key: &str, - value: &T, -) -> sqlx::Result<()> { - sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") - .bind(key) - .bind(sqlx::types::Json(value)) - .execute(db) - .await?; +pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { + serialized!( + sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") + .bind(key) + .bind(sqlx::types::Json(value)) + .execute(&mut *db) + )?; Ok(()) } @@ -840,7 +851,7 @@ mod test { use compact_str::CompactString; use jiff::{Span, Timestamp}; use serde_json::json; - use sqlx::{PgConnection, PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; + use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; use taler_api::{ db::TypeHelper, notification::dummy_listen, @@ -882,15 +893,15 @@ mod test { }); assert_eq!( - db::kv_get::<serde_json::Value>(&mut *db, "value") + db::kv_get::<serde_json::Value>(&mut db, "value") .await .unwrap(), None ); - db::kv_set(&mut *db, "value", &value).await.unwrap(); - db::kv_set(&mut *db, "value", &value).await.unwrap(); + db::kv_set(&mut db, "value", &value).await.unwrap(); + db::kv_set(&mut db, "value", &value).await.unwrap(); assert_eq!( - db::kv_get::<serde_json::Value>(&mut *db, "value") + db::kv_get::<serde_json::Value>(&mut db, "value") .await .unwrap(), Some(value) @@ -901,11 +912,8 @@ mod test { async fn tx_in() { let (mut db, pool) = setup().await; - async fn routine( - db: &mut PgConnection, - first: &Option<IncomingSubject>, - second: &Option<IncomingSubject>, - ) { + let mut routine = async |first: &Option<IncomingSubject>, + second: &Option<IncomingSubject>| { let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") .try_map(|r: PgRow| r.try_get_u64(0)) .fetch_one(&mut *db) @@ -924,7 +932,7 @@ mod test { }; // Insert assert_eq!( - db::register_tx_in(db, &tx, first, &now) + db::register_tx_in(&mut db, &tx, first, &now) .await .expect("register tx in"), AddIncomingResult::Success { @@ -936,7 +944,7 @@ mod test { // Idempotent assert_eq!( db::register_tx_in( - db, + &mut db, &TxIn { valued_at: later, ..tx.clone() @@ -955,7 +963,7 @@ mod test { // Many assert_eq!( db::register_tx_in( - db, + &mut db, &TxIn { transfer_id: later.as_microsecond() as i64, valued_at: later, @@ -972,7 +980,7 @@ mod test { valued_at: later } ); - } + }; // Empty db assert_eq!( @@ -989,11 +997,10 @@ mod test { ); // Regular transaction - routine(&mut db, &None, &None).await; + routine(&None, &None).await; // Reserve transaction routine( - &mut db, &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), ) @@ -1001,7 +1008,6 @@ mod test { // Kyc transaction routine( - &mut db, &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), ) @@ -1101,7 +1107,7 @@ mod test { async fn tx_out() { let (mut db, pool) = setup().await; - async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) { + let mut routine = async |first: &TxOutKind, second: &TxOutKind| { let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") .try_map(|r: PgRow| r.try_get(0)) .fetch_one(&mut *db) @@ -1120,7 +1126,7 @@ mod test { }; assert!(matches!( db::make_transfer( - &mut *db, + &pool, &Transfer { request_uid: HashCode::rand(), amount: decimal("10"), @@ -1135,13 +1141,13 @@ mod test { .unwrap(), TransferResult::Success { .. } )); - db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), transfer_id) + db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id) .await .expect("status success"); // Insert assert_eq!( - db::register_tx_out(&mut *db, &tx, first, &now) + db::register_tx_out(&mut db, &tx, first, &now) .await .expect("register tx out"), AddOutgoingResult { @@ -1152,7 +1158,7 @@ mod test { // Idempotent assert_eq!( db::register_tx_out( - &mut *db, + &mut db, &TxOut { valued_at: later, ..tx.clone() @@ -1170,7 +1176,7 @@ mod test { // Recovered assert_eq!( db::register_tx_out( - &mut *db, + &mut db, &TxOut { transfer_id: transfer_id + 1, tx_id: Some(transfer_id + 1), @@ -1187,7 +1193,7 @@ mod test { row_id: transfer_id + 1, } ); - } + }; // Empty db assert_eq!( @@ -1198,18 +1204,17 @@ mod test { ); // Regular transaction - routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await; + routine(&TxOutKind::Simple, &TxOutKind::Simple).await; // Talerable transaction routine( - &mut db, &TxOutKind::Talerable(OutgoingSubject::rand()), &TxOutKind::Talerable(OutgoingSubject::rand()), ) .await; // Bounced transaction - routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; + routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; // History assert_eq!( @@ -1225,17 +1230,17 @@ mod test { #[tokio::test] async fn transfer() { - let (mut db, _) = setup().await; + let (_, pool) = setup().await; // Empty db assert_eq!( - db::transfer_by_id(&mut *db, 0, &CURRENCY, &ROOT) + db::transfer_by_id(&pool, 0, &CURRENCY, &ROOT) .await .unwrap(), None ); assert_eq!( - db::transfer_page(&mut *db, &None, &CURRENCY, &ROOT, &Page::default()) + db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default()) .await .unwrap(), Vec::new() @@ -1253,7 +1258,7 @@ mod test { let later = now + Span::new().hours(2); // Insert assert_eq!( - db::make_transfer(&mut *db, &req, &now) + db::make_transfer(&pool, &req, &now) .await .expect("transfer"), TransferResult::Success { @@ -1263,7 +1268,7 @@ mod test { ); // Idempotent assert_eq!( - db::make_transfer(&mut *db, &req, &later) + db::make_transfer(&pool, &req, &later) .await .expect("transfer"), TransferResult::Success { @@ -1274,7 +1279,7 @@ mod test { // Request UID reuse assert_eq!( db::make_transfer( - &mut *db, + &pool, &Transfer { wtid: ShortHashCode::rand(), ..req.clone() @@ -1288,7 +1293,7 @@ mod test { // wtid reuse assert_eq!( db::make_transfer( - &mut *db, + &pool, &Transfer { request_uid: HashCode::rand(), ..req.clone() @@ -1302,7 +1307,7 @@ mod test { // Many assert_eq!( db::make_transfer( - &mut *db, + &pool, &Transfer { request_uid: HashCode::rand(), wtid: ShortHashCode::rand(), @@ -1320,25 +1325,25 @@ mod test { // Get assert!( - db::transfer_by_id(&mut *db, 1, &CURRENCY, &ROOT) + db::transfer_by_id(&pool, 1, &CURRENCY, &ROOT) .await .unwrap() .is_some() ); assert!( - db::transfer_by_id(&mut *db, 2, &CURRENCY, &ROOT) + db::transfer_by_id(&pool, 2, &CURRENCY, &ROOT) .await .unwrap() .is_some() ); assert!( - db::transfer_by_id(&mut *db, 3, &CURRENCY, &ROOT) + db::transfer_by_id(&pool, 3, &CURRENCY, &ROOT) .await .unwrap() .is_none() ); assert_eq!( - db::transfer_page(&mut *db, &None, &CURRENCY, &ROOT, &Page::default()) + db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default()) .await .unwrap() .len(), @@ -1430,15 +1435,10 @@ mod test { #[tokio::test] async fn status() { - let (mut db, _) = setup().await; + let (mut db, pool) = setup().await; - async fn check_status( - db: &mut PgConnection, - id: u64, - status: TransferState, - msg: Option<&str>, - ) { - let transfer = db::transfer_by_id(db, id, &CURRENCY, &ROOT) + let check_status = async |id: u64, status: TransferState, msg: Option<&str>| { + let transfer = db::transfer_by_id(&pool, id, &CURRENCY, &ROOT) .await .unwrap() .unwrap(); @@ -1446,13 +1446,13 @@ mod test { (status, msg), (transfer.status, transfer.status_msg.as_deref()) ); - } + }; // Unknown transfer - db::initiated_submit_permanent_failure(&mut *db, 1, "msg") + db::initiated_submit_permanent_failure(&mut db, 1, "msg") .await .unwrap(); - db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), 12) + db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) .await .unwrap(); assert_eq!( @@ -1462,7 +1462,7 @@ mod test { // Failure db::make_transfer( - &mut *db, + &pool, &Transfer { request_uid: HashCode::rand(), amount: decimal("1"), @@ -1475,21 +1475,15 @@ mod test { ) .await .expect("transfer"); - check_status(&mut db, 1, TransferState::pending, None).await; - db::initiated_submit_permanent_failure(&mut *db, 1, "error status") + check_status(1, TransferState::pending, None).await; + db::initiated_submit_permanent_failure(&mut db, 1, "error status") .await .unwrap(); - check_status( - &mut db, - 1, - TransferState::permanent_failure, - Some("error status"), - ) - .await; + check_status(1, TransferState::permanent_failure, Some("error status")).await; // Success db::make_transfer( - &mut *db, + &pool, &Transfer { request_uid: HashCode::rand(), amount: decimal("1"), @@ -1502,11 +1496,11 @@ mod test { ) .await .expect("transfer"); - check_status(&mut db, 2, TransferState::pending, None).await; - db::initiated_submit_success(&mut *db, 2, &Timestamp::now(), 3) + check_status(2, TransferState::pending, None).await; + db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) .await .unwrap(); - check_status(&mut db, 2, TransferState::pending, None).await; + check_status(2, TransferState::pending, None).await; db::register_tx_out( &mut db, &TxOut { @@ -1523,20 +1517,14 @@ mod test { ) .await .unwrap(); - check_status(&mut db, 2, TransferState::success, None).await; + check_status(2, TransferState::success, None).await; // Chargeback assert_eq!( db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), ChargebackFailureResult::Known(2) ); - check_status( - &mut db, - 2, - TransferState::late_failure, - Some("charged back"), - ) - .await; + check_status(2, TransferState::late_failure, Some("charged back")).await; assert_eq!( db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), ChargebackFailureResult::Idempotent(2) @@ -1545,11 +1533,11 @@ mod test { #[tokio::test] async fn batch() { - let (mut db, _) = setup().await; + let (mut db, pool) = setup().await; let start = Timestamp::now(); // Empty db - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 0); @@ -1557,7 +1545,7 @@ mod test { // Some transfers for i in 0..3 { db::make_transfer( - &mut *db, + &pool, &Transfer { request_uid: HashCode::rand(), amount: decimal(format!("{}", i + 1)), @@ -1571,7 +1559,7 @@ mod test { .await .expect("transfer"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 3); @@ -1579,7 +1567,7 @@ mod test { // Max 100 txs in batch for i in 0..100 { db::make_transfer( - &mut *db, + &pool, &Transfer { request_uid: HashCode::rand(), amount: decimal(format!("{}", i + 1)), @@ -1593,29 +1581,29 @@ mod test { .await .expect("transfer"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 100); // Skip uploaded for i in 0..=10 { - db::initiated_submit_success(&mut *db, i, &Timestamp::now(), i) + db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) .await .expect("status success"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 93); // Skip failed for i in 0..=10 { - db::initiated_submit_permanent_failure(&mut *db, 10 + i, "failure") + db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") .await .expect("status failure"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 83); diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -18,9 +18,10 @@ use std::fmt::Display; use jiff::{Timestamp, civil::Date, tz::TimeZone}; use serde::{Serialize, de::DeserializeOwned}; -use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; +use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; use taler_api::{ db::{BindHelper, IncomingType, TypeHelper, history, page}, + serialized, subject::{IncomingSubject, OutgoingSubject}, }; use taler_common::{ @@ -195,32 +196,33 @@ pub async fn register_tx_in_admin( tx: &TxInAdmin, now: &Timestamp, ) -> sqlx::Result<AddIncomingResult> { - sqlx::query( - " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) - ", - ) - .bind(&tx.amount) - .bind(&tx.subject) - .bind(tx.debtor.iban()) - .bind(&tx.debtor.name) - .bind_date(&now.to_zoned(TimeZone::UTC).date()) - .bind(tx.metadata.ty()) - .bind(tx.metadata.key()) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag(0)? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_date(2)?, - new: r.try_get(3)?, - } + serialized!( + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) + ", + ) + .bind(&tx.amount) + .bind(&tx.subject) + .bind(tx.debtor.iban()) + .bind(&tx.debtor.name) + .bind_date(&now.to_zoned(TimeZone::UTC).date()) + .bind(tx.metadata.ty()) + .bind(tx.metadata.key()) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_u64(1)?, + valued_at: r.try_get_date(2)?, + new: r.try_get(3)?, + } + }) }) - }) - .fetch_one(db) - .await + .fetch_one(db) + ) } pub async fn register_tx_in( @@ -229,34 +231,35 @@ pub async fn register_tx_in( subject: &Option<IncomingSubject>, now: &Timestamp, ) -> sqlx::Result<AddIncomingResult> { - sqlx::query( - " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) - ", - ) - .bind(tx.code as i64) - .bind(&tx.amount) - .bind(&tx.subject) - .bind(tx.debtor.iban()) - .bind(&tx.debtor.name) - .bind_date(&tx.value_date) - .bind(subject.as_ref().map(|it| it.ty())) - .bind(subject.as_ref().map(|it| it.key())) - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag(0)? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success { - row_id: r.try_get_u64(1)?, - valued_at: r.try_get_date(2)?, - new: r.try_get(3)?, - } + serialized!( + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) + ", + ) + .bind(tx.code as i64) + .bind(&tx.amount) + .bind(&tx.subject) + .bind(tx.debtor.iban()) + .bind(&tx.debtor.name) + .bind_date(&tx.value_date) + .bind(subject.as_ref().map(|it| it.ty())) + .bind(subject.as_ref().map(|it| it.key())) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_u64(1)?, + valued_at: r.try_get_date(2)?, + new: r.try_get(3)?, + } + }) }) - }) - .fetch_one(db) - .await + .fetch_one(&mut *db) + ) } #[derive(Debug)] @@ -290,42 +293,43 @@ pub async fn register_tx_out( kind: &TxOutKind, now: &Timestamp, ) -> sqlx::Result<AddOutgoingResult> { - let query = sqlx::query( - " - SELECT out_result, out_tx_row_id - FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) - ", - ) - .bind(tx.code as i64) - .bind(&tx.amount) - .bind(&tx.subject) - .bind(tx.creditor.iban()) - .bind(&tx.creditor.name) - .bind_date(&tx.value_date); - let query = match kind { - TxOutKind::Simple => query - .bind(None::<&[u8]>) - .bind(None::<&str>) - .bind(None::<i64>), - TxOutKind::Bounce(bounced) => query - .bind(None::<&[u8]>) - .bind(None::<&str>) - .bind(*bounced as i64), - TxOutKind::Talerable(subject) => query - .bind(&subject.wtid) - .bind(subject.exchange_base_url.as_str()) - .bind(None::<i64>), - }; - query - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(AddOutgoingResult { - result: r.try_get(0)?, - row_id: r.try_get_u64(1)?, + serialized!({ + let query = sqlx::query( + " + SELECT out_result, out_tx_row_id + FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) + ", + ) + .bind(tx.code as i64) + .bind(&tx.amount) + .bind(&tx.subject) + .bind(tx.creditor.iban()) + .bind(&tx.creditor.name) + .bind_date(&tx.value_date); + let query = match kind { + TxOutKind::Simple => query + .bind(None::<&[u8]>) + .bind(None::<&str>) + .bind(None::<i64>), + TxOutKind::Bounce(bounced) => query + .bind(None::<&[u8]>) + .bind(None::<&str>) + .bind(*bounced as i64), + TxOutKind::Talerable(subject) => query + .bind(&subject.wtid) + .bind(subject.exchange_base_url.as_str()) + .bind(None::<i64>), + }; + query + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(AddOutgoingResult { + result: r.try_get(0)?, + row_id: r.try_get_u64(1)?, + }) }) - }) - .fetch_one(db) - .await + .fetch_one(&mut *db) + }) } #[derive(Debug, PartialEq, Eq)] @@ -340,23 +344,24 @@ pub async fn register_tx_out_failure( bounced: Option<u32>, now: &Timestamp, ) -> sqlx::Result<OutFailureResult> { - sqlx::query( - " - SELECT out_new, out_initiated_id - FROM register_tx_out_failure($1, $2, $3) - ", - ) - .bind(code as i64) - .bind(bounced.map(|i| i as i32)) - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(OutFailureResult { - new: r.try_get(0)?, - initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), + serialized!( + sqlx::query( + " + SELECT out_new, out_initiated_id + FROM register_tx_out_failure($1, $2, $3) + ", + ) + .bind(code as i64) + .bind(bounced.map(|i| i as i32)) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(OutFailureResult { + new: r.try_get(0)?, + initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), + }) }) - }) - .fetch_one(db) - .await + .fetch_one(&mut *db) + ) } #[derive(Debug, PartialEq, Eq)] @@ -375,40 +380,41 @@ pub struct Transfer { pub creditor: FullHuPayto, } -pub async fn make_transfer<'a>( - db: impl PgExecutor<'a>, +pub async fn make_transfer( + db: &PgPool, tx: &Transfer, now: &Timestamp, ) -> sqlx::Result<TransferResult> { let subject = format!("{} {}", tx.wtid, tx.exchange_base_url); - sqlx::query( - " - SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at - FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) - ", - ) - .bind(&tx.request_uid) - .bind(&tx.wtid) - .bind(&subject) - .bind(tx.amount) - .bind(tx.exchange_base_url.as_str()) - .bind(tx.creditor.iban()) - .bind(&tx.creditor.name) - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(if r.try_get_flag(0)? { - TransferResult::RequestUidReuse - } else if r.try_get_flag(1)? { - TransferResult::WtidReuse - } else { - TransferResult::Success { - id: r.try_get_u64(2)?, - initiated_at: r.try_get_timestamp(3)?, - } + serialized!( + sqlx::query( + " + SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at + FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) + ", + ) + .bind(&tx.request_uid) + .bind(&tx.wtid) + .bind(&subject) + .bind(tx.amount) + .bind(tx.exchange_base_url.as_str()) + .bind(tx.creditor.iban()) + .bind(&tx.creditor.name) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag(0)? { + TransferResult::RequestUidReuse + } else if r.try_get_flag(1)? { + TransferResult::WtidReuse + } else { + TransferResult::Success { + id: r.try_get_u64(2)?, + initiated_at: r.try_get_timestamp(3)?, + } + }) }) - }) - .fetch_one(db) - .await + .fetch_one(db) + ) } #[derive(Debug, PartialEq, Eq)] @@ -426,35 +432,36 @@ pub async fn register_bounce_tx_in( reason: &str, now: &Timestamp, ) -> sqlx::Result<BounceResult> { - sqlx::query( - " - SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new - FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) - ", - ) - .bind(tx.code as i64) - .bind(&tx.amount) - .bind(&tx.subject) - .bind(tx.debtor.iban()) - .bind(&tx.debtor.name) - .bind_date(&tx.value_date) - .bind(amount) - .bind(reason) - .bind_timestamp(now) - .try_map(|r: PgRow| { - Ok(BounceResult { - tx_id: r.try_get_u64(0)?, - tx_new: r.try_get(1)?, - bounce_id: r.try_get_u64(2)?, - bounce_new: r.try_get(3)?, + serialized!( + sqlx::query( + " + SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new + FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) + ", + ) + .bind(tx.code as i64) + .bind(&tx.amount) + .bind(&tx.subject) + .bind(tx.debtor.iban()) + .bind(&tx.debtor.name) + .bind_date(&tx.value_date) + .bind(amount) + .bind(reason) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(BounceResult { + tx_id: r.try_get_u64(0)?, + tx_new: r.try_get(1)?, + bounce_id: r.try_get_u64(2)?, + bounce_new: r.try_get(3)?, + }) }) - }) - .fetch_one(db) - .await + .fetch_one(&mut *db) + ) } -pub async fn transfer_page<'a>( - db: impl PgExecutor<'a>, +pub async fn transfer_page( + db: &PgPool, status: &Option<TransferState>, params: &Page, ) -> sqlx::Result<Vec<TransferListStatus>> { @@ -634,172 +641,174 @@ pub async fn revenue_history( .await } -pub async fn transfer_by_id<'a>( - db: impl PgExecutor<'a>, - id: u64, -) -> sqlx::Result<Option<TransferStatus>> { - sqlx::query( - " - SELECT - status, - status_msg, - amount, - exchange_base_url, - wtid, - credit_account, - credit_name, - initiated_at - FROM transfer - JOIN initiated USING (initiated_id) - WHERE initiated_id = $1 - ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - status: r.try_get(0)?, - status_msg: r.try_get(1)?, - amount: r.try_get_amount(2, &CURRENCY)?, - origin_exchange_url: r.try_get(3)?, - wtid: r.try_get(4)?, - credit_account: r.try_get_iban(5)?.as_full_payto(r.try_get(6)?), - timestamp: r.try_get_timestamp(7)?.into(), +pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> { + serialized!( + sqlx::query( + " + SELECT + status, + status_msg, + amount, + exchange_base_url, + wtid, + credit_account, + credit_name, + initiated_at + FROM transfer + JOIN initiated USING (initiated_id) + WHERE initiated_id = $1 + ", + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + status: r.try_get(0)?, + status_msg: r.try_get(1)?, + amount: r.try_get_amount(2, &CURRENCY)?, + origin_exchange_url: r.try_get(3)?, + wtid: r.try_get(4)?, + credit_account: r.try_get_iban(5)?.as_full_payto(r.try_get(6)?), + timestamp: r.try_get_timestamp(7)?.into(), + }) }) - }) - .fetch_optional(db) - .await + .fetch_optional(db) + ) } /** Get a batch of pending initiated transactions not attempted since [start] */ -pub async fn pending_batch<'a>( - db: impl PgExecutor<'a>, +pub async fn pending_batch( + db: &mut PgConnection, start: &Timestamp, ) -> sqlx::Result<Vec<Initiated>> { - sqlx::query( - " - SELECT initiated_id, amount, subject, credit_account, credit_name - FROM initiated - WHERE magnet_code IS NULL - AND status='pending' - AND (last_submitted IS NULL OR last_submitted < $1) - LIMIT 100 - ", - ) - .bind_timestamp(start) - .try_map(|r: PgRow| { - Ok(Initiated { - id: r.try_get_u64(0)?, - amount: r.try_get_amount(1, &CURRENCY)?, - subject: r.try_get(2)?, - creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), + serialized!( + sqlx::query( + " + SELECT initiated_id, amount, subject, credit_account, credit_name + FROM initiated + WHERE magnet_code IS NULL + AND status='pending' + AND (last_submitted IS NULL OR last_submitted < $1) + LIMIT 100 + ", + ) + .bind_timestamp(start) + .try_map(|r: PgRow| { + Ok(Initiated { + id: r.try_get_u64(0)?, + amount: r.try_get_amount(1, &CURRENCY)?, + subject: r.try_get(2)?, + creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), + }) }) - }) - .fetch_all(db) - .await + .fetch_all(&mut *db) + ) } /** Get an initiated transaction matching the given magnet [code] */ -pub async fn initiated_by_code<'a>( - db: impl PgExecutor<'a>, +pub async fn initiated_by_code( + db: &mut PgConnection, code: u64, ) -> sqlx::Result<Option<Initiated>> { - sqlx::query( - " - SELECT initiated_id, amount, subject, credit_account, credit_name - FROM initiated - WHERE magnet_code IS $1 - ", - ) - .bind(code as i64) - .try_map(|r: PgRow| { - Ok(Initiated { - id: r.try_get_u64(0)?, - amount: r.try_get_amount(1, &CURRENCY)?, - subject: r.try_get(2)?, - creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), + serialized!( + sqlx::query( + " + SELECT initiated_id, amount, subject, credit_account, credit_name + FROM initiated + WHERE magnet_code IS $1 + ", + ) + .bind(code as i64) + .try_map(|r: PgRow| { + Ok(Initiated { + id: r.try_get_u64(0)?, + amount: r.try_get_amount(1, &CURRENCY)?, + subject: r.try_get(2)?, + creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), + }) }) - }) - .fetch_optional(db) - .await + .fetch_optional(&mut *db) + ) } /** Update status of a successful submitted initiated transaction */ -pub async fn initiated_submit_success<'a>( - db: impl PgExecutor<'a>, +pub async fn initiated_submit_success( + db: &mut PgConnection, id: u64, timestamp: &Timestamp, magnet_code: u64, ) -> sqlx::Result<()> { - sqlx::query( - " - UPDATE initiated - SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 - WHERE initiated_id=$3 - " - ).bind_timestamp(timestamp) - .bind(magnet_code as i64) - .bind(id as i64) - .execute(db).await?; + serialized!( + sqlx::query( + " + UPDATE initiated + SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 + WHERE initiated_id=$3 + " + ).bind_timestamp(timestamp) + .bind(magnet_code as i64) + .bind(id as i64) + .execute(&mut *db) + )?; Ok(()) } /** Update status of a permanently failed initiated transaction */ -pub async fn initiated_submit_permanent_failure<'a>( - db: impl PgExecutor<'a>, +pub async fn initiated_submit_permanent_failure( + db: &mut PgConnection, id: u64, timestamp: &Timestamp, msg: &str, ) -> sqlx::Result<()> { - sqlx::query( - " - UPDATE initiated - SET status='permanent_failure', status_msg=$2 - WHERE initiated_id=$3 - ", - ) - .bind_timestamp(timestamp) - .bind(msg) - .bind(id as i64) - .execute(db) - .await?; + serialized!( + sqlx::query( + " + UPDATE initiated + SET status='permanent_failure', status_msg=$2 + WHERE initiated_id=$3 + ", + ) + .bind_timestamp(timestamp) + .bind(msg) + .bind(id as i64) + .execute(&mut *db) + )?; Ok(()) } /** Check if an initiated transaction exist for a magnet code */ -pub async fn initiated_exists_for_code<'a>( - db: impl PgExecutor<'a>, +pub async fn initiated_exists_for_code( + db: &mut PgConnection, code: u64, ) -> sqlx::Result<Option<u64>> { - sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") - .bind(code as i64) - .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) - .fetch_optional(db) - .await + serialized!( + sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") + .bind(code as i64) + .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) + .fetch_optional(&mut *db) + ) } /** Get JSON value from KV table */ -pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>( - db: impl PgExecutor<'a>, +pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( + db: &mut PgConnection, key: &str, ) -> sqlx::Result<Option<T>> { - sqlx::query("SELECT value FROM kv WHERE key=$1") - .bind(key) - .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) - .fetch_optional(db) - .await + serialized!( + sqlx::query("SELECT value FROM kv WHERE key=$1") + .bind(key) + .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) + .fetch_optional(&mut *db) + ) } /** Set JSON value in KV table */ -pub async fn kv_set<'a, T: Serialize>( - db: impl PgExecutor<'a>, - key: &str, - value: &T, -) -> sqlx::Result<()> { - sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") - .bind(key) - .bind(sqlx::types::Json(value)) - .execute(db) - .await?; +pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { + serialized!( + sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") + .bind(key) + .bind(sqlx::types::Json(value)) + .execute(&mut *db) + )?; Ok(()) } @@ -807,7 +816,7 @@ pub async fn kv_set<'a, T: Serialize>( mod test { use jiff::{Span, Timestamp, Zoned}; use serde_json::json; - use sqlx::{PgConnection, PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; + use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; use taler_api::{ db::TypeHelper, notification::dummy_listen, @@ -850,17 +859,13 @@ mod test { }); assert_eq!( - kv_get::<serde_json::Value>(&mut *db, "value") - .await - .unwrap(), + kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), None ); - kv_set(&mut *db, "value", &value).await.unwrap(); - kv_set(&mut *db, "value", &value).await.unwrap(); + kv_set(&mut db, "value", &value).await.unwrap(); + kv_set(&mut db, "value", &value).await.unwrap(); assert_eq!( - kv_get::<serde_json::Value>(&mut *db, "value") - .await - .unwrap(), + kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), Some(value) ); } @@ -869,11 +874,8 @@ mod test { async fn tx_in() { let (mut db, pool) = setup().await; - async fn routine( - db: &mut PgConnection, - first: &Option<IncomingSubject>, - second: &Option<IncomingSubject>, - ) { + let mut routine = async |first: &Option<IncomingSubject>, + second: &Option<IncomingSubject>| { let (id, code) = sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) @@ -895,7 +897,7 @@ mod test { }; // Insert assert_eq!( - register_tx_in(db, &tx, first, &now) + register_tx_in(&mut db, &tx, first, &now) .await .expect("register tx in"), AddIncomingResult::Success { @@ -907,7 +909,7 @@ mod test { // Idempotent assert_eq!( register_tx_in( - db, + &mut db, &TxIn { value_date: later, ..tx.clone() @@ -926,7 +928,7 @@ mod test { // Many assert_eq!( register_tx_in( - db, + &mut db, &TxIn { code: code + 1, value_date: later, @@ -943,7 +945,7 @@ mod test { valued_at: later } ); - } + }; // Empty db assert_eq!( @@ -960,11 +962,10 @@ mod test { ); // Regular transaction - routine(&mut db, &None, &None).await; + routine(&None, &None).await; // Reserve transaction routine( - &mut db, &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), ) @@ -972,7 +973,6 @@ mod test { // Kyc transaction routine( - &mut db, &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), ) @@ -1072,7 +1072,7 @@ mod test { async fn tx_out() { let (mut db, pool) = setup().await; - async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) { + let mut routine = async |first: &TxOutKind, second: &TxOutKind| { let (id, code) = sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) @@ -1094,7 +1094,7 @@ mod test { }; assert!(matches!( make_transfer( - &mut *db, + &pool, &db::Transfer { request_uid: HashCode::rand(), amount: decimal("10"), @@ -1108,13 +1108,13 @@ mod test { .unwrap(), TransferResult::Success { .. } )); - db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), tx.code) + db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code) .await .expect("status success"); // Insert assert_eq!( - register_tx_out(&mut *db, &tx, first, &now) + register_tx_out(&mut db, &tx, first, &now) .await .expect("register tx out"), AddOutgoingResult { @@ -1125,7 +1125,7 @@ mod test { // Idempotent assert_eq!( register_tx_out( - &mut *db, + &mut db, &TxOut { value_date: later, ..tx.clone() @@ -1143,7 +1143,7 @@ mod test { // Recovered assert_eq!( register_tx_out( - &mut *db, + &mut db, &TxOut { code: code + 1, value_date: later, @@ -1159,7 +1159,7 @@ mod test { row_id: id + 1, } ); - } + }; // Empty db assert_eq!( @@ -1170,18 +1170,17 @@ mod test { ); // Regular transaction - routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await; + routine(&TxOutKind::Simple, &TxOutKind::Simple).await; // Talerable transaction routine( - &mut db, &TxOutKind::Talerable(OutgoingSubject::rand()), &TxOutKind::Talerable(OutgoingSubject::rand()), ) .await; // Bounced transaction - routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; + routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; // History assert_eq!( @@ -1195,7 +1194,7 @@ mod test { #[tokio::test] async fn tx_out_failure() { - let (mut db, _) = setup().await; + let (mut db, pool) = setup().await; let now = now_sql_stable_ts(); @@ -1229,13 +1228,13 @@ mod test { }; let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); assert_eq!( - make_transfer(&mut *db, &req, &now).await.unwrap(), + make_transfer(&pool, &req, &now).await.unwrap(), TransferResult::Success { id: 1, initiated_at: now } ); - db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), 34) + db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) .await .expect("status success"); assert_eq!( @@ -1299,12 +1298,12 @@ mod test { #[tokio::test] async fn transfer() { - let (mut db, _) = setup().await; + let (_, pool) = setup().await; // Empty db - assert_eq!(db::transfer_by_id(&mut *db, 0).await.unwrap(), None); + assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None); assert_eq!( - db::transfer_page(&mut *db, &None, &Page::default()) + db::transfer_page(&pool, &None, &Page::default()) .await .unwrap(), Vec::new() @@ -1321,7 +1320,7 @@ mod test { let later = now + Span::new().hours(2); // Insert assert_eq!( - make_transfer(&mut *db, &req, &now).await.expect("transfer"), + make_transfer(&pool, &req, &now).await.expect("transfer"), TransferResult::Success { id: 1, initiated_at: now @@ -1329,9 +1328,7 @@ mod test { ); // Idempotent assert_eq!( - make_transfer(&mut *db, &req, &later) - .await - .expect("transfer"), + make_transfer(&pool, &req, &later).await.expect("transfer"), TransferResult::Success { id: 1, initiated_at: now @@ -1340,7 +1337,7 @@ mod test { // Request UID reuse assert_eq!( make_transfer( - &mut *db, + &pool, &db::Transfer { wtid: ShortHashCode::rand(), ..req.clone() @@ -1354,7 +1351,7 @@ mod test { // wtid reuse assert_eq!( make_transfer( - &mut *db, + &pool, &db::Transfer { request_uid: HashCode::rand(), ..req.clone() @@ -1368,7 +1365,7 @@ mod test { // Many assert_eq!( make_transfer( - &mut *db, + &pool, &db::Transfer { request_uid: HashCode::rand(), wtid: ShortHashCode::rand(), @@ -1385,11 +1382,11 @@ mod test { ); // Get - assert!(db::transfer_by_id(&mut *db, 1).await.unwrap().is_some()); - assert!(db::transfer_by_id(&mut *db, 2).await.unwrap().is_some()); - assert!(db::transfer_by_id(&mut *db, 3).await.unwrap().is_none()); + assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some()); + assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some()); + assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none()); assert_eq!( - db::transfer_page(&mut *db, &None, &Page::default()) + db::transfer_page(&pool, &None, &Page::default()) .await .unwrap() .len(), @@ -1407,7 +1404,7 @@ mod test { let date = Zoned::now().date(); // Empty db - assert!(db::pending_batch(&mut *db, &now).await.unwrap().is_empty()); + assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty()); // Insert assert_eq!( @@ -1537,7 +1534,7 @@ mod test { // Batch assert_eq!( - db::pending_batch(&mut *db, &now).await.unwrap(), + db::pending_batch(&mut db, &now).await.unwrap(), &[ Initiated { id: 1, @@ -1560,23 +1557,23 @@ mod test { let (mut db, _) = setup().await; // Unknown transfer - db::initiated_submit_permanent_failure(&mut *db, 1, &Timestamp::now(), "msg") + db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") .await .unwrap(); - db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), 12) + db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) .await .unwrap(); } #[tokio::test] async fn batch() { - let (mut db, _) = setup().await; + let (mut db, pool) = setup().await; let start = Timestamp::now(); let magnet_payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); // Empty db - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 0); @@ -1584,7 +1581,7 @@ mod test { // Some transfers for i in 0..3 { make_transfer( - &mut *db, + &pool, &db::Transfer { request_uid: HashCode::rand(), amount: decimal(format!("{}", i + 1)), @@ -1597,7 +1594,7 @@ mod test { .await .expect("transfer"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 3); @@ -1605,7 +1602,7 @@ mod test { // Max 100 txs in batch for i in 0..100 { make_transfer( - &mut *db, + &pool, &db::Transfer { request_uid: HashCode::rand(), amount: decimal(format!("{}", i + 1)), @@ -1618,29 +1615,29 @@ mod test { .await .expect("transfer"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 100); // Skip uploaded for i in 0..=10 { - db::initiated_submit_success(&mut *db, i, &Timestamp::now(), i) + db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) .await .expect("status success"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 93); // Skip failed for i in 0..=10 { - db::initiated_submit_permanent_failure(&mut *db, 10 + i, &Timestamp::now(), "failure") + db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") .await .expect("status failure"); } - let pendings = db::pending_batch(&mut *db, &start) + let pendings = db::pending_batch(&mut db, &start) .await .expect("pending_batch"); assert_eq!(pendings.len(), 83);