taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (50213B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::{Timestamp, civil::Date, tz::TimeZone};
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api::{
     30         HashCode, ShortHashCode,
     31         params::{History, Page},
     32         prepared::{RegistrationRequest, Unregistration},
     33         revenue::RevenueIncomingBankTransaction,
     34         wire::{
     35             IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     36             TransferStatus,
     37         },
     38     },
     39     config::Config,
     40     db::IncomingType,
     41     types::{
     42         amount::{Amount, Decimal},
     43         payto::PaytoImpl as _,
     44     },
     45 };
     46 use tokio::sync::watch::{Receiver, Sender};
     47 use url::Url;
     48 
     49 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURR, magnet_api::types::TxStatus};
     50 
     51 const SCHEMA: &str = "magnet_bank";
     52 
     53 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     54     let db = parse_db_cfg(cfg)?;
     55     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     56     Ok(pool)
     57 }
     58 
     59 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     60     let db_cfg = parse_db_cfg(cfg)?;
     61     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     62     let mut db = pool.acquire().await?;
     63     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
     64     Ok(pool)
     65 }
     66 
     67 pub async fn notification_listener(
     68     pool: PgPool,
     69     in_channel: Sender<i64>,
     70     taler_in_channel: Sender<i64>,
     71     out_channel: Sender<i64>,
     72     taler_out_channel: Sender<i64>,
     73 ) {
     74     taler_api::notification::notification_listener!(&pool,
     75         "tx_in" => (row_id: i64) {
     76             in_channel.send_replace(row_id);
     77         },
     78         "taler_in" => (row_id: i64) {
     79             taler_in_channel.send_replace(row_id);
     80         },
     81         "tx_out" => (row_id: i64) {
     82             out_channel.send_replace(row_id);
     83         },
     84         "taler_out" => (row_id: i64) {
     85             taler_out_channel.send_replace(row_id);
     86         }
     87     )
     88 }
     89 
     90 #[derive(Debug, Clone)]
     91 pub struct TxIn {
     92     pub code: u64,
     93     pub amount: Amount,
     94     pub subject: Box<str>,
     95     pub debtor: FullHuPayto,
     96     pub value_date: Date,
     97     pub status: TxStatus,
     98 }
     99 
    100 impl Display for TxIn {
    101     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    102         let Self {
    103             code,
    104             amount,
    105             subject,
    106             debtor,
    107             value_date,
    108             status,
    109         } = self;
    110         write!(
    111             f,
    112             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    113             debtor.bban(),
    114             debtor.name
    115         )
    116     }
    117 }
    118 
    119 #[derive(Debug, Clone)]
    120 pub struct TxOut {
    121     pub code: u64,
    122     pub amount: Amount,
    123     pub subject: Box<str>,
    124     pub creditor: FullHuPayto,
    125     pub value_date: Date,
    126     pub status: TxStatus,
    127 }
    128 
    129 impl Display for TxOut {
    130     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    131         let Self {
    132             code,
    133             amount,
    134             subject,
    135             creditor,
    136             value_date,
    137             status,
    138         } = self;
    139         write!(
    140             f,
    141             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    142             creditor.bban(),
    143             &creditor.name
    144         )
    145     }
    146 }
    147 
    148 #[derive(Debug, PartialEq, Eq)]
    149 pub struct Initiated {
    150     pub id: u64,
    151     pub amount: Amount,
    152     pub subject: Box<str>,
    153     pub creditor: FullHuPayto,
    154 }
    155 
    156 impl Display for Initiated {
    157     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    158         let Self {
    159             id,
    160             amount,
    161             subject,
    162             creditor,
    163         } = self;
    164         write!(
    165             f,
    166             "{id} {amount} ({} {}) '{subject}'",
    167             creditor.bban(),
    168             &creditor.name
    169         )
    170     }
    171 }
    172 
    173 #[derive(Debug, Clone)]
    174 pub struct TxInAdmin {
    175     pub amount: Amount,
    176     pub subject: String,
    177     pub debtor: FullHuPayto,
    178     pub metadata: IncomingSubject,
    179 }
    180 
    181 /// Lock the database for worker execution
    182 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    183     sqlx::query("SELECT pg_try_advisory_lock(42)")
    184         .try_map(|r: PgRow| r.try_get(0))
    185         .fetch_one(e)
    186         .await
    187 }
    188 
    189 #[derive(Debug, PartialEq, Eq)]
    190 pub enum AddIncomingResult {
    191     Success {
    192         new: bool,
    193         pending: bool,
    194         row_id: u64,
    195         valued_at: Date,
    196     },
    197     ReservePubReuse,
    198     UnknownMapping,
    199     MappingReuse,
    200 }
    201 
    202 pub async fn register_tx_in_admin(
    203     db: &PgPool,
    204     tx: &TxInAdmin,
    205     now: &Timestamp,
    206 ) -> sqlx::Result<AddIncomingResult> {
    207     serialized!(
    208         sqlx::query(
    209             "
    210                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    211                 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    212             ",
    213         )
    214         .bind(tx.amount)
    215         .bind(&tx.subject)
    216         .bind(tx.debtor.iban())
    217         .bind(&tx.debtor.name)
    218         .bind_date(&now.to_zoned(TimeZone::UTC).date())
    219         .bind(tx.metadata.ty())
    220         .bind(tx.metadata.key())
    221         .try_map(|r: PgRow| {
    222             Ok(if r.try_get_flag(0)? {
    223                 AddIncomingResult::ReservePubReuse
    224             } else if r.try_get_flag(1)? {
    225                 AddIncomingResult::MappingReuse
    226             } else if r.try_get_flag(2)? {
    227                 AddIncomingResult::UnknownMapping
    228             } else {
    229                 AddIncomingResult::Success {
    230                     row_id: r.try_get_u64(3)?,
    231                     valued_at: r.try_get_date(4)?,
    232                     new: r.try_get(5)?,
    233                     pending: r.try_get(6)?
    234                 }
    235             })
    236         })
    237         .fetch_one(db)
    238     )
    239 }
    240 
    241 pub async fn register_tx_in(
    242     db: &mut PgConnection,
    243     tx: &TxIn,
    244     subject: &Option<IncomingSubject>,
    245     now: &Timestamp,
    246 ) -> sqlx::Result<AddIncomingResult> {
    247     serialized!(
    248         sqlx::query(
    249             "
    250                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    251                 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9)
    252             ",
    253         )
    254         .bind(tx.code as i64)
    255         .bind(tx.amount)
    256         .bind(&tx.subject)
    257         .bind(tx.debtor.iban())
    258         .bind(&tx.debtor.name)
    259         .bind_date(&tx.value_date)
    260         .bind(subject.as_ref().map(|it| it.ty()))
    261         .bind(subject.as_ref().map(|it| it.key()))
    262         .bind_timestamp(now)
    263         .try_map(|r: PgRow| {
    264             Ok(if r.try_get_flag(0)? {
    265                 AddIncomingResult::ReservePubReuse
    266             } else if r.try_get_flag(1)? {
    267                 AddIncomingResult::MappingReuse
    268             } else if r.try_get_flag(2)? {
    269                 AddIncomingResult::UnknownMapping
    270             } else {
    271                 AddIncomingResult::Success {
    272                     row_id: r.try_get_u64(3)?,
    273                     valued_at: r.try_get_date(4)?,
    274                     new: r.try_get(5)?,
    275                     pending: r.try_get(6)?
    276                 }
    277             })
    278         })
    279         .fetch_one(&mut *db)
    280     )
    281 }
    282 
    283 #[derive(Debug)]
    284 pub enum TxOutKind {
    285     Simple,
    286     Bounce(u32),
    287     Talerable(OutgoingSubject),
    288 }
    289 
    290 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    291 #[allow(non_camel_case_types)]
    292 #[sqlx(type_name = "register_result")]
    293 pub enum RegisterResult {
    294     /// Already registered
    295     idempotent,
    296     /// Initiated transaction
    297     known,
    298     /// Recovered unknown outgoing transaction
    299     recovered,
    300 }
    301 
    302 #[derive(Debug, PartialEq, Eq)]
    303 pub struct AddOutgoingResult {
    304     pub result: RegisterResult,
    305     pub row_id: u64,
    306 }
    307 
    308 pub async fn register_tx_out(
    309     db: &mut PgConnection,
    310     tx: &TxOut,
    311     kind: &TxOutKind,
    312     now: &Timestamp,
    313 ) -> sqlx::Result<AddOutgoingResult> {
    314     serialized!({
    315         let query = sqlx::query(
    316             "
    317                 SELECT out_result, out_tx_row_id
    318                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
    319             ",
    320         )
    321         .bind(tx.code as i64)
    322         .bind(tx.amount)
    323         .bind(&tx.subject)
    324         .bind(tx.creditor.iban())
    325         .bind(&tx.creditor.name)
    326         .bind_date(&tx.value_date);
    327         let query = match kind {
    328             TxOutKind::Simple => query
    329                 .bind(None::<&[u8]>)
    330                 .bind(None::<&str>)
    331                 .bind(None::<&str>)
    332                 .bind(None::<i64>),
    333             TxOutKind::Bounce(bounced) => query
    334                 .bind(None::<&[u8]>)
    335                 .bind(None::<&str>)
    336                 .bind(None::<&str>)
    337                 .bind(*bounced as i64),
    338             TxOutKind::Talerable(subject) => query
    339                 .bind(&subject.wtid)
    340                 .bind(subject.exchange_base_url.as_str())
    341                 .bind(&subject.metadata)
    342                 .bind(None::<i64>),
    343         };
    344         query
    345             .bind_timestamp(now)
    346             .try_map(|r: PgRow| {
    347                 Ok(AddOutgoingResult {
    348                     result: r.try_get(0)?,
    349                     row_id: r.try_get_u64(1)?,
    350                 })
    351             })
    352             .fetch_one(&mut *db)
    353     })
    354 }
    355 
    356 #[derive(Debug, PartialEq, Eq)]
    357 pub struct OutFailureResult {
    358     pub initiated_id: Option<u64>,
    359     pub new: bool,
    360 }
    361 
    362 pub async fn register_tx_out_failure(
    363     db: &mut PgConnection,
    364     code: u64,
    365     bounced: Option<u32>,
    366     now: &Timestamp,
    367 ) -> sqlx::Result<OutFailureResult> {
    368     serialized!(
    369         sqlx::query(
    370             "
    371                 SELECT out_new, out_initiated_id
    372                 FROM register_tx_out_failure($1, $2, $3)
    373             ",
    374         )
    375         .bind(code as i64)
    376         .bind(bounced.map(|i| i as i32))
    377         .bind_timestamp(now)
    378         .try_map(|r: PgRow| {
    379             Ok(OutFailureResult {
    380                 new: r.try_get(0)?,
    381                 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
    382             })
    383         })
    384         .fetch_one(&mut *db)
    385     )
    386 }
    387 
    388 #[derive(Debug, PartialEq, Eq)]
    389 pub enum TransferResult {
    390     Success { id: u64, initiated_at: Timestamp },
    391     RequestUidReuse,
    392     WtidReuse,
    393 }
    394 
    395 #[derive(Debug, Clone)]
    396 pub struct Transfer {
    397     pub request_uid: HashCode,
    398     pub amount: Decimal,
    399     pub exchange_base_url: Url,
    400     pub metadata: Option<CompactString>,
    401     pub wtid: ShortHashCode,
    402     pub creditor: FullHuPayto,
    403 }
    404 
    405 pub async fn make_transfer(
    406     db: &PgPool,
    407     tx: &Transfer,
    408     now: &Timestamp,
    409 ) -> sqlx::Result<TransferResult> {
    410     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    411     serialized!(
    412         sqlx::query(
    413             "
    414                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    415                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    416             ",
    417         )
    418         .bind(&tx.request_uid)
    419         .bind(&tx.wtid)
    420         .bind(&subject)
    421         .bind(tx.amount)
    422         .bind(tx.exchange_base_url.as_str())
    423         .bind(&tx.metadata)
    424         .bind(tx.creditor.iban())
    425         .bind(&tx.creditor.name)
    426         .bind_timestamp(now)
    427         .try_map(|r: PgRow| {
    428             Ok(if r.try_get_flag(0)? {
    429                 TransferResult::RequestUidReuse
    430             } else if r.try_get_flag(1)? {
    431                 TransferResult::WtidReuse
    432             } else {
    433                 TransferResult::Success {
    434                     id: r.try_get_u64(2)?,
    435                     initiated_at: r.try_get_timestamp(3)?,
    436                 }
    437             })
    438         })
    439         .fetch_one(db)
    440     )
    441 }
    442 
    443 #[derive(Debug, PartialEq, Eq)]
    444 pub struct BounceResult {
    445     pub tx_id: u64,
    446     pub tx_new: bool,
    447     pub bounce_id: u64,
    448     pub bounce_new: bool,
    449 }
    450 
    451 pub async fn register_bounce_tx_in(
    452     db: &mut PgConnection,
    453     tx: &TxIn,
    454     reason: &str,
    455     now: &Timestamp,
    456 ) -> sqlx::Result<BounceResult> {
    457     serialized!(
    458         sqlx::query(
    459             "
    460                 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new
    461                 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8)
    462             ",
    463         )
    464         .bind(tx.code as i64)
    465         .bind(tx.amount)
    466         .bind(&tx.subject)
    467         .bind(tx.debtor.iban())
    468         .bind(&tx.debtor.name)
    469         .bind_date(&tx.value_date)
    470         .bind(reason)
    471         .bind_timestamp(now)
    472         .try_map(|r: PgRow| {
    473             Ok(BounceResult {
    474                 tx_id: r.try_get_u64(0)?,
    475                 tx_new: r.try_get(1)?,
    476                 bounce_id: r.try_get_u64(2)?,
    477                 bounce_new: r.try_get(3)?,
    478             })
    479         })
    480         .fetch_one(&mut *db)
    481     )
    482 }
    483 
    484 pub async fn transfer_page(
    485     db: &PgPool,
    486     status: &Option<TransferState>,
    487     params: &Page,
    488 ) -> sqlx::Result<Vec<TransferListStatus>> {
    489     page(
    490         db,
    491         params,
    492         "initiated_id",
    493         || {
    494             let mut builder = QueryBuilder::new(
    495                 "
    496                     SELECT
    497                         initiated_id,
    498                         status,
    499                         amount,
    500                         credit_account,
    501                         credit_name,
    502                         initiated_at
    503                     FROM transfer
    504                     JOIN initiated USING (initiated_id)
    505                     WHERE
    506                 ",
    507             );
    508             if let Some(status) = status {
    509                 builder.push(" status = ").push_bind(status).push(" AND ");
    510             }
    511             builder
    512         },
    513         |r: PgRow| {
    514             Ok(TransferListStatus {
    515                 row_id: r.try_get_u64(0)?,
    516                 status: r.try_get(1)?,
    517                 amount: r.try_get_amount(2, &CURR)?,
    518                 credit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    519                 timestamp: r.try_get_timestamp(5)?.into(),
    520             })
    521         },
    522     )
    523     .await
    524 }
    525 
    526 pub async fn outgoing_history(
    527     db: &PgPool,
    528     params: &History,
    529     listen: impl FnOnce() -> Receiver<i64>,
    530 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    531     history(
    532         db,
    533         "tx_out_id",
    534         params,
    535         listen,
    536         || {
    537             QueryBuilder::new(
    538                 "
    539                 SELECT
    540                     tx_out_id,
    541                     amount,
    542                     credit_account,
    543                     credit_name,
    544                     valued_at,
    545                     exchange_base_url,
    546                     metadata,
    547                     wtid
    548                 FROM taler_out
    549                 JOIN tx_out USING (tx_out_id)
    550                 WHERE
    551             ",
    552             )
    553         },
    554         |r: PgRow| {
    555             Ok(OutgoingBankTransaction {
    556                 row_id: r.try_get_u64(0)?,
    557                 amount: r.try_get_amount(1, &CURR)?,
    558                 debit_fee: None,
    559                 credit_account: r.try_get_iban(2)?.as_full_uri(r.try_get(3)?),
    560                 date: r.try_get_timestamp(4)?.into(),
    561                 exchange_base_url: r.try_get_url(5)?,
    562                 metadata: r.try_get(6)?,
    563                 wtid: r.try_get(7)?,
    564             })
    565         },
    566     )
    567     .await
    568 }
    569 
    570 pub async fn incoming_history(
    571     db: &PgPool,
    572     params: &History,
    573     listen: impl FnOnce() -> Receiver<i64>,
    574 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    575     history(
    576         db,
    577         "tx_in_id",
    578         params,
    579         listen,
    580         || {
    581             QueryBuilder::new(
    582                 "
    583                 SELECT
    584                     type,
    585                     tx_in_id,
    586                     amount,
    587                     debit_account,
    588                     debit_name,
    589                     valued_at,
    590                     metadata,
    591                     authorization_pub,
    592                     authorization_sig
    593                 FROM taler_in
    594                 JOIN tx_in USING (tx_in_id)
    595                 WHERE
    596             ",
    597             )
    598         },
    599         |r: PgRow| {
    600             Ok(match r.try_get(0)? {
    601                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    602                     row_id: r.try_get_u64(1)?,
    603                     amount: r.try_get_amount(2, &CURR)?,
    604                     credit_fee: None,
    605                     debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    606                     date: r.try_get_timestamp(5)?.into(),
    607                     reserve_pub: r.try_get(6)?,
    608                     authorization_pub: r.try_get(7)?,
    609                     authorization_sig: r.try_get(8)?,
    610                 },
    611                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    612                     row_id: r.try_get_u64(1)?,
    613                     amount: r.try_get_amount(2, &CURR)?,
    614                     credit_fee: None,
    615                     debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    616                     date: r.try_get_timestamp(5)?.into(),
    617                     account_pub: r.try_get(6)?,
    618                     authorization_pub: r.try_get(7)?,
    619                     authorization_sig: r.try_get(8)?,
    620                 },
    621                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    622             })
    623         },
    624     )
    625     .await
    626 }
    627 
    628 pub async fn revenue_history(
    629     db: &PgPool,
    630     params: &History,
    631     listen: impl FnOnce() -> Receiver<i64>,
    632 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    633     history(
    634         db,
    635         "tx_in_id",
    636         params,
    637         listen,
    638         || {
    639             QueryBuilder::new(
    640                 "
    641                 SELECT
    642                     tx_in_id,
    643                     valued_at,
    644                     amount,
    645                     debit_account,
    646                     debit_name,
    647                     subject
    648                 FROM tx_in
    649                 WHERE
    650             ",
    651             )
    652         },
    653         |r: PgRow| {
    654             Ok(RevenueIncomingBankTransaction {
    655                 row_id: r.try_get_u64(0)?,
    656                 date: r.try_get_timestamp(1)?.into(),
    657                 amount: r.try_get_amount(2, &CURR)?,
    658                 credit_fee: None,
    659                 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    660                 subject: r.try_get(5)?,
    661             })
    662         },
    663     )
    664     .await
    665 }
    666 
    667 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> {
    668     serialized!(
    669         sqlx::query(
    670             "
    671                 SELECT
    672                     status,
    673                     status_msg,
    674                     amount,
    675                     exchange_base_url,
    676                     metadata,
    677                     wtid,
    678                     credit_account,
    679                     credit_name,
    680                     initiated_at
    681                 FROM transfer
    682                 JOIN initiated USING (initiated_id)
    683                 WHERE initiated_id = $1
    684             ",
    685         )
    686         .bind(id as i64)
    687         .try_map(|r: PgRow| {
    688             Ok(TransferStatus {
    689                 status: r.try_get(0)?,
    690                 status_msg: r.try_get(1)?,
    691                 amount: r.try_get_amount(2, &CURR)?,
    692                 origin_exchange_url: r.try_get(3)?,
    693                 metadata: r.try_get(4)?,
    694                 wtid: r.try_get(5)?,
    695                 credit_account: r.try_get_iban(6)?.as_full_uri(r.try_get(7)?),
    696                 timestamp: r.try_get_timestamp(8)?.into(),
    697             })
    698         })
    699         .fetch_optional(db)
    700     )
    701 }
    702 
    703 /** Get a batch of pending initiated transactions not attempted since [start] */
    704 pub async fn pending_batch(
    705     db: &mut PgConnection,
    706     start: &Timestamp,
    707 ) -> sqlx::Result<Vec<Initiated>> {
    708     serialized!(
    709         sqlx::query(
    710             "
    711                 SELECT initiated_id, amount, subject, credit_account, credit_name
    712                 FROM initiated
    713                 WHERE magnet_code IS NULL
    714                     AND status='pending'
    715                     AND (last_submitted IS NULL OR last_submitted < $1)
    716                 LIMIT 100
    717             ",
    718         )
    719         .bind_timestamp(start)
    720         .try_map(|r: PgRow| {
    721             Ok(Initiated {
    722                 id: r.try_get_u64(0)?,
    723                 amount: r.try_get_amount(1, &CURR)?,
    724                 subject: r.try_get(2)?,
    725                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    726             })
    727         })
    728         .fetch_all(&mut *db)
    729     )
    730 }
    731 
    732 /** Get an initiated transaction matching the given magnet [code] */
    733 pub async fn initiated_by_code(
    734     db: &mut PgConnection,
    735     code: u64,
    736 ) -> sqlx::Result<Option<Initiated>> {
    737     serialized!(
    738         sqlx::query(
    739             "
    740                 SELECT initiated_id, amount, subject, credit_account, credit_name
    741                 FROM initiated
    742                 WHERE magnet_code IS $1
    743             ",
    744         )
    745         .bind(code as i64)
    746         .try_map(|r: PgRow| {
    747             Ok(Initiated {
    748                 id: r.try_get_u64(0)?,
    749                 amount: r.try_get_amount(1, &CURR)?,
    750                 subject: r.try_get(2)?,
    751                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    752             })
    753         })
    754         .fetch_optional(&mut *db)
    755     )
    756 }
    757 
    758 /** Update status of a successful submitted initiated transaction */
    759 pub async fn initiated_submit_success(
    760     db: &mut PgConnection,
    761     id: u64,
    762     timestamp: &Timestamp,
    763     magnet_code: u64,
    764 ) -> sqlx::Result<()> {
    765     serialized!(
    766         sqlx::query(
    767             "
    768                 UPDATE initiated
    769                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
    770                 WHERE initiated_id=$3
    771             "
    772         ).bind_timestamp(timestamp)
    773         .bind(magnet_code as i64)
    774         .bind(id as i64)
    775         .execute(&mut *db)
    776     )?;
    777     Ok(())
    778 }
    779 
    780 /** Update status of a permanently failed initiated transaction */
    781 pub async fn initiated_submit_permanent_failure(
    782     db: &mut PgConnection,
    783     id: u64,
    784     timestamp: &Timestamp,
    785     msg: &str,
    786 ) -> sqlx::Result<()> {
    787     serialized!(
    788         sqlx::query(
    789             "
    790                 UPDATE initiated
    791                 SET status='permanent_failure', status_msg=$2
    792                 WHERE initiated_id=$3
    793             ",
    794         )
    795         .bind_timestamp(timestamp)
    796         .bind(msg)
    797         .bind(id as i64)
    798         .execute(&mut *db)
    799     )?;
    800     Ok(())
    801 }
    802 
    803 /** Check if an initiated transaction exist for a magnet code */
    804 pub async fn initiated_exists_for_code(
    805     db: &mut PgConnection,
    806     code: u64,
    807 ) -> sqlx::Result<Option<u64>> {
    808     serialized!(
    809         sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1")
    810             .bind(code as i64)
    811             .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64))
    812             .fetch_optional(&mut *db)
    813     )
    814 }
    815 
    816 /** Get JSON value from KV table */
    817 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    818     db: &mut PgConnection,
    819     key: &str,
    820 ) -> sqlx::Result<Option<T>> {
    821     serialized!(
    822         sqlx::query("SELECT value FROM kv WHERE key=$1")
    823             .bind(key)
    824             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    825             .fetch_optional(&mut *db)
    826     )
    827 }
    828 
    829 /** Set JSON value in KV table */
    830 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    831     serialized!(
    832         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    833             .bind(key)
    834             .bind(sqlx::types::Json(value))
    835             .execute(&mut *db)
    836     )?;
    837     Ok(())
    838 }
    839 
    840 pub enum RegistrationResult {
    841     Success,
    842     ReservePubReuse,
    843 }
    844 
    845 pub async fn transfer_register(
    846     db: &PgPool,
    847     req: &RegistrationRequest,
    848 ) -> sqlx::Result<RegistrationResult> {
    849     let ty: IncomingType = req.r#type.into();
    850     serialized!(
    851         sqlx::query(
    852             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    853         )
    854         .bind(ty)
    855         .bind(&req.account_pub)
    856         .bind(&req.authorization_pub)
    857         .bind(&req.authorization_sig)
    858         .bind(req.recurrent)
    859         .bind_timestamp(&Timestamp::now())
    860         .try_map(|r: PgRow| {
    861             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    862                 RegistrationResult::ReservePubReuse
    863             } else {
    864                 RegistrationResult::Success
    865             })
    866         })
    867         .fetch_one(db)
    868     )
    869 }
    870 
    871 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    872     serialized!(
    873         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    874             .bind(&req.authorization_pub)
    875             .bind_timestamp(&Timestamp::now())
    876             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    877             .fetch_one(db)
    878     )
    879 }
    880 
    881 #[cfg(test)]
    882 mod test {
    883     use jiff::{Span, Timestamp, Zoned};
    884     use serde_json::json;
    885     use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow};
    886     use taler_api::{
    887         db::TypeHelper,
    888         notification::dummy_listen,
    889         subject::{IncomingSubject, OutgoingSubject},
    890     };
    891     use taler_common::{
    892         api::{
    893             EddsaPublicKey, HashCode, ShortHashCode,
    894             params::{History, Page},
    895         },
    896         types::{
    897             amount::{amount, decimal},
    898             url,
    899             utils::now_sql_stable_ts,
    900         },
    901     };
    902 
    903     use super::TxInAdmin;
    904     use crate::{
    905         constants::CONFIG_SOURCE,
    906         db::{
    907             self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult,
    908             TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer,
    909             register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out,
    910         },
    911         magnet_api::types::TxStatus,
    912         magnet_payto,
    913     };
    914 
    915     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    916         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    917     }
    918 
    919     #[tokio::test]
    920     async fn kv() {
    921         let (mut db, _) = setup().await;
    922 
    923         let value = json!({
    924             "name": "Mr Smith",
    925             "no way": 32
    926         });
    927 
    928         assert_eq!(
    929             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    930             None
    931         );
    932         kv_set(&mut db, "value", &value).await.unwrap();
    933         kv_set(&mut db, "value", &value).await.unwrap();
    934         assert_eq!(
    935             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    936             Some(value)
    937         );
    938     }
    939 
    940     #[tokio::test]
    941     async fn tx_in() {
    942         let (mut db, pool) = setup().await;
    943 
    944         let mut routine = async |first: &Option<IncomingSubject>,
    945                                  second: &Option<IncomingSubject>| {
    946             let (id, code) =
    947                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
    948                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
    949                     .fetch_one(&mut *db)
    950                     .await
    951                     .unwrap();
    952             let now = now_sql_stable_ts();
    953             let date = Zoned::now().date();
    954             let later = date.tomorrow().unwrap();
    955             let tx = TxIn {
    956                 code,
    957                 amount: amount("EUR:10"),
    958                 subject: "subject".into(),
    959                 debtor: magnet_payto(
    960                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
    961                 ),
    962                 value_date: date,
    963                 status: TxStatus::Completed,
    964             };
    965             // Insert
    966             assert_eq!(
    967                 register_tx_in(&mut db, &tx, first, &now)
    968                     .await
    969                     .expect("register tx in"),
    970                 AddIncomingResult::Success {
    971                     new: true,
    972                     pending: false,
    973                     row_id: id,
    974                     valued_at: date
    975                 }
    976             );
    977             // Idempotent
    978             assert_eq!(
    979                 register_tx_in(
    980                     &mut db,
    981                     &TxIn {
    982                         value_date: later,
    983                         ..tx.clone()
    984                     },
    985                     first,
    986                     &now
    987                 )
    988                 .await
    989                 .expect("register tx in"),
    990                 AddIncomingResult::Success {
    991                     new: false,
    992                     pending: false,
    993                     row_id: id,
    994                     valued_at: date
    995                 }
    996             );
    997             // Many
    998             assert_eq!(
    999                 register_tx_in(
   1000                     &mut db,
   1001                     &TxIn {
   1002                         code: code + 1,
   1003                         value_date: later,
   1004                         ..tx
   1005                     },
   1006                     second,
   1007                     &now
   1008                 )
   1009                 .await
   1010                 .expect("register tx in"),
   1011                 AddIncomingResult::Success {
   1012                     new: true,
   1013                     pending: false,
   1014                     row_id: id + 1,
   1015                     valued_at: later
   1016                 }
   1017             );
   1018         };
   1019 
   1020         // Empty db
   1021         assert_eq!(
   1022             db::revenue_history(&pool, &History::default(), dummy_listen)
   1023                 .await
   1024                 .unwrap(),
   1025             Vec::new()
   1026         );
   1027         assert_eq!(
   1028             db::incoming_history(&pool, &History::default(), dummy_listen)
   1029                 .await
   1030                 .unwrap(),
   1031             Vec::new()
   1032         );
   1033 
   1034         // Regular transaction
   1035         routine(&None, &None).await;
   1036 
   1037         // Reserve transaction
   1038         routine(
   1039             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1040             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1041         )
   1042         .await;
   1043 
   1044         // Kyc transaction
   1045         routine(
   1046             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1047             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1048         )
   1049         .await;
   1050 
   1051         // History
   1052         assert_eq!(
   1053             db::revenue_history(&pool, &History::default(), dummy_listen)
   1054                 .await
   1055                 .unwrap()
   1056                 .len(),
   1057             6
   1058         );
   1059         assert_eq!(
   1060             db::incoming_history(&pool, &History::default(), dummy_listen)
   1061                 .await
   1062                 .unwrap()
   1063                 .len(),
   1064             4
   1065         );
   1066     }
   1067 
   1068     #[tokio::test]
   1069     async fn tx_in_admin() {
   1070         let (_, pool) = setup().await;
   1071 
   1072         // Empty db
   1073         assert_eq!(
   1074             db::incoming_history(&pool, &History::default(), dummy_listen)
   1075                 .await
   1076                 .unwrap(),
   1077             Vec::new()
   1078         );
   1079 
   1080         let now = now_sql_stable_ts();
   1081         let later = now + Span::new().hours(2);
   1082         let date = Zoned::now().date();
   1083         let tx = TxInAdmin {
   1084             amount: amount("EUR:10"),
   1085             subject: "subject".to_owned(),
   1086             debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1087             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1088         };
   1089         // Insert
   1090         assert_eq!(
   1091             register_tx_in_admin(&pool, &tx, &now)
   1092                 .await
   1093                 .expect("register tx in"),
   1094             AddIncomingResult::Success {
   1095                 new: true,
   1096                 pending: false,
   1097                 row_id: 1,
   1098                 valued_at: date
   1099             }
   1100         );
   1101         // Many
   1102         assert_eq!(
   1103             register_tx_in_admin(
   1104                 &pool,
   1105                 &TxInAdmin {
   1106                     subject: "Other".to_owned(),
   1107                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1108                     ..tx.clone()
   1109                 },
   1110                 &later
   1111             )
   1112             .await
   1113             .expect("register tx in"),
   1114             AddIncomingResult::Success {
   1115                 new: true,
   1116                 pending: false,
   1117                 row_id: 2,
   1118                 valued_at: date
   1119             }
   1120         );
   1121 
   1122         // History
   1123         assert_eq!(
   1124             db::incoming_history(&pool, &History::default(), dummy_listen)
   1125                 .await
   1126                 .unwrap()
   1127                 .len(),
   1128             2
   1129         );
   1130     }
   1131 
   1132     #[tokio::test]
   1133     async fn tx_out() {
   1134         let (mut db, pool) = setup().await;
   1135 
   1136         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1137             let (id, code) =
   1138                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
   1139                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
   1140                     .fetch_one(&mut *db)
   1141                     .await
   1142                     .unwrap();
   1143             let now = now_sql_stable_ts();
   1144             let date = Zoned::now().date();
   1145             let later = date.tomorrow().unwrap();
   1146             let tx = TxOut {
   1147                 code,
   1148                 amount: amount("HUF:10"),
   1149                 subject: "subject".into(),
   1150                 creditor: magnet_payto(
   1151                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
   1152                 ),
   1153                 value_date: date,
   1154                 status: TxStatus::Completed,
   1155             };
   1156             assert!(matches!(
   1157                 make_transfer(
   1158                     &pool,
   1159                     &db::Transfer {
   1160                         request_uid: HashCode::rand(),
   1161                         amount: decimal("10"),
   1162                         exchange_base_url: url("https://exchange.test.com/"),
   1163                         metadata: None,
   1164                         wtid: ShortHashCode::rand(),
   1165                         creditor: tx.creditor.clone()
   1166                     },
   1167                     &now
   1168                 )
   1169                 .await
   1170                 .unwrap(),
   1171                 TransferResult::Success { .. }
   1172             ));
   1173             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code)
   1174                 .await
   1175                 .expect("status success");
   1176 
   1177             // Insert
   1178             assert_eq!(
   1179                 register_tx_out(&mut db, &tx, first, &now)
   1180                     .await
   1181                     .expect("register tx out"),
   1182                 AddOutgoingResult {
   1183                     result: db::RegisterResult::known,
   1184                     row_id: id,
   1185                 }
   1186             );
   1187             // Idempotent
   1188             assert_eq!(
   1189                 register_tx_out(
   1190                     &mut db,
   1191                     &TxOut {
   1192                         value_date: later,
   1193                         ..tx.clone()
   1194                     },
   1195                     first,
   1196                     &now
   1197                 )
   1198                 .await
   1199                 .expect("register tx out"),
   1200                 AddOutgoingResult {
   1201                     result: db::RegisterResult::idempotent,
   1202                     row_id: id,
   1203                 }
   1204             );
   1205             // Recovered
   1206             assert_eq!(
   1207                 register_tx_out(
   1208                     &mut db,
   1209                     &TxOut {
   1210                         code: code + 1,
   1211                         value_date: later,
   1212                         ..tx.clone()
   1213                     },
   1214                     second,
   1215                     &now
   1216                 )
   1217                 .await
   1218                 .expect("register tx out"),
   1219                 AddOutgoingResult {
   1220                     result: db::RegisterResult::recovered,
   1221                     row_id: id + 1,
   1222                 }
   1223             );
   1224         };
   1225 
   1226         // Empty db
   1227         assert_eq!(
   1228             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1229                 .await
   1230                 .unwrap(),
   1231             Vec::new()
   1232         );
   1233 
   1234         // Regular transaction
   1235         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1236 
   1237         // Talerable transaction
   1238         routine(
   1239             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1240             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1241         )
   1242         .await;
   1243 
   1244         // Bounced transaction
   1245         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1246 
   1247         // History
   1248         assert_eq!(
   1249             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1250                 .await
   1251                 .unwrap()
   1252                 .len(),
   1253             2
   1254         );
   1255     }
   1256 
   1257     #[tokio::test]
   1258     async fn tx_out_failure() {
   1259         let (mut db, pool) = setup().await;
   1260 
   1261         let now = now_sql_stable_ts();
   1262 
   1263         // Unknown
   1264         assert_eq!(
   1265             db::register_tx_out_failure(&mut db, 42, None, &now)
   1266                 .await
   1267                 .unwrap(),
   1268             OutFailureResult {
   1269                 initiated_id: None,
   1270                 new: false
   1271             }
   1272         );
   1273         assert_eq!(
   1274             db::register_tx_out_failure(&mut db, 42, Some(12), &now)
   1275                 .await
   1276                 .unwrap(),
   1277             OutFailureResult {
   1278                 initiated_id: None,
   1279                 new: false
   1280             }
   1281         );
   1282 
   1283         // Initiated
   1284         let req = db::Transfer {
   1285             request_uid: HashCode::rand(),
   1286             amount: decimal("10"),
   1287             exchange_base_url: url("https://exchange.test.com/"),
   1288             metadata: None,
   1289             wtid: ShortHashCode::rand(),
   1290             creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1291         };
   1292         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1293         assert_eq!(
   1294             make_transfer(&pool, &req, &now).await.unwrap(),
   1295             TransferResult::Success {
   1296                 id: 1,
   1297                 initiated_at: now
   1298             }
   1299         );
   1300         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34)
   1301             .await
   1302             .expect("status success");
   1303         assert_eq!(
   1304             db::register_tx_out_failure(&mut db, 34, None, &now)
   1305                 .await
   1306                 .unwrap(),
   1307             OutFailureResult {
   1308                 initiated_id: Some(1),
   1309                 new: true
   1310             }
   1311         );
   1312         assert_eq!(
   1313             db::register_tx_out_failure(&mut db, 34, None, &now)
   1314                 .await
   1315                 .unwrap(),
   1316             OutFailureResult {
   1317                 initiated_id: Some(1),
   1318                 new: false
   1319             }
   1320         );
   1321 
   1322         // Recovered bounce
   1323         let tx = TxIn {
   1324             code: 12,
   1325             amount: amount("HUF:11"),
   1326             subject: "malformed transaction".into(),
   1327             debtor: payto,
   1328             value_date: Zoned::now().date(),
   1329             status: TxStatus::Completed,
   1330         };
   1331         assert_eq!(
   1332             db::register_bounce_tx_in(&mut db, &tx, "no reason", &now)
   1333                 .await
   1334                 .unwrap(),
   1335             BounceResult {
   1336                 tx_id: 1,
   1337                 tx_new: true,
   1338                 bounce_id: 2,
   1339                 bounce_new: true
   1340             }
   1341         );
   1342         assert_eq!(
   1343             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1344                 .await
   1345                 .unwrap(),
   1346             OutFailureResult {
   1347                 initiated_id: Some(2),
   1348                 new: true
   1349             }
   1350         );
   1351         assert_eq!(
   1352             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1353                 .await
   1354                 .unwrap(),
   1355             OutFailureResult {
   1356                 initiated_id: Some(2),
   1357                 new: false
   1358             }
   1359         );
   1360     }
   1361 
   1362     #[tokio::test]
   1363     async fn transfer() {
   1364         let (_, pool) = setup().await;
   1365 
   1366         // Empty db
   1367         assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None);
   1368         assert_eq!(
   1369             db::transfer_page(&pool, &None, &Page::default())
   1370                 .await
   1371                 .unwrap(),
   1372             Vec::new()
   1373         );
   1374 
   1375         let req = db::Transfer {
   1376             request_uid: HashCode::rand(),
   1377             amount: decimal("10"),
   1378             exchange_base_url: url("https://exchange.test.com/"),
   1379             metadata: None,
   1380             wtid: ShortHashCode::rand(),
   1381             creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1382         };
   1383         let now = now_sql_stable_ts();
   1384         let later = now + Span::new().hours(2);
   1385         // Insert
   1386         assert_eq!(
   1387             make_transfer(&pool, &req, &now).await.expect("transfer"),
   1388             TransferResult::Success {
   1389                 id: 1,
   1390                 initiated_at: now
   1391             }
   1392         );
   1393         // Idempotent
   1394         assert_eq!(
   1395             make_transfer(&pool, &req, &later).await.expect("transfer"),
   1396             TransferResult::Success {
   1397                 id: 1,
   1398                 initiated_at: now
   1399             }
   1400         );
   1401         // Request UID reuse
   1402         assert_eq!(
   1403             make_transfer(
   1404                 &pool,
   1405                 &db::Transfer {
   1406                     wtid: ShortHashCode::rand(),
   1407                     ..req.clone()
   1408                 },
   1409                 &now
   1410             )
   1411             .await
   1412             .expect("transfer"),
   1413             TransferResult::RequestUidReuse
   1414         );
   1415         // wtid reuse
   1416         assert_eq!(
   1417             make_transfer(
   1418                 &pool,
   1419                 &db::Transfer {
   1420                     request_uid: HashCode::rand(),
   1421                     ..req.clone()
   1422                 },
   1423                 &now
   1424             )
   1425             .await
   1426             .expect("transfer"),
   1427             TransferResult::WtidReuse
   1428         );
   1429         // Many
   1430         assert_eq!(
   1431             make_transfer(
   1432                 &pool,
   1433                 &db::Transfer {
   1434                     request_uid: HashCode::rand(),
   1435                     wtid: ShortHashCode::rand(),
   1436                     ..req
   1437                 },
   1438                 &later
   1439             )
   1440             .await
   1441             .expect("transfer"),
   1442             TransferResult::Success {
   1443                 id: 2,
   1444                 initiated_at: later
   1445             }
   1446         );
   1447 
   1448         // Get
   1449         assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some());
   1450         assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some());
   1451         assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none());
   1452         assert_eq!(
   1453             db::transfer_page(&pool, &None, &Page::default())
   1454                 .await
   1455                 .unwrap()
   1456                 .len(),
   1457             2
   1458         );
   1459     }
   1460 
   1461     #[tokio::test]
   1462     async fn bounce() {
   1463         let (mut db, _) = setup().await;
   1464 
   1465         let amount = amount("HUF:10");
   1466         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1467         let now = now_sql_stable_ts();
   1468         let date = Zoned::now().date();
   1469 
   1470         // Empty db
   1471         assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty());
   1472 
   1473         // Insert
   1474         assert_eq!(
   1475             register_tx_in(
   1476                 &mut db,
   1477                 &TxIn {
   1478                     code: 13,
   1479                     amount,
   1480                     subject: "subject".into(),
   1481                     debtor: payto.clone(),
   1482                     value_date: date,
   1483                     status: TxStatus::Completed
   1484                 },
   1485                 &None,
   1486                 &now
   1487             )
   1488             .await
   1489             .expect("register tx in"),
   1490             AddIncomingResult::Success {
   1491                 new: true,
   1492                 pending: false,
   1493                 row_id: 1,
   1494                 valued_at: date
   1495             }
   1496         );
   1497 
   1498         // Bounce
   1499         assert_eq!(
   1500             register_bounce_tx_in(
   1501                 &mut db,
   1502                 &TxIn {
   1503                     code: 12,
   1504                     amount,
   1505                     subject: "subject".into(),
   1506                     debtor: payto.clone(),
   1507                     value_date: date,
   1508                     status: TxStatus::Completed
   1509                 },
   1510                 "good reason",
   1511                 &now
   1512             )
   1513             .await
   1514             .expect("bounce"),
   1515             BounceResult {
   1516                 tx_id: 2,
   1517                 tx_new: true,
   1518                 bounce_id: 1,
   1519                 bounce_new: true
   1520             }
   1521         );
   1522         // Idempotent
   1523         assert_eq!(
   1524             register_bounce_tx_in(
   1525                 &mut db,
   1526                 &TxIn {
   1527                     code: 12,
   1528                     amount,
   1529                     subject: "subject".into(),
   1530                     debtor: payto.clone(),
   1531                     value_date: date,
   1532                     status: TxStatus::Completed
   1533                 },
   1534                 "good reason",
   1535                 &now
   1536             )
   1537             .await
   1538             .expect("bounce"),
   1539             BounceResult {
   1540                 tx_id: 2,
   1541                 tx_new: false,
   1542                 bounce_id: 1,
   1543                 bounce_new: false
   1544             }
   1545         );
   1546 
   1547         // Bounce registered
   1548         assert_eq!(
   1549             register_bounce_tx_in(
   1550                 &mut db,
   1551                 &TxIn {
   1552                     code: 13,
   1553                     amount,
   1554                     subject: "subject".into(),
   1555                     debtor: payto.clone(),
   1556                     value_date: date,
   1557                     status: TxStatus::Completed
   1558                 },
   1559                 "good reason",
   1560                 &now
   1561             )
   1562             .await
   1563             .expect("bounce"),
   1564             BounceResult {
   1565                 tx_id: 1,
   1566                 tx_new: false,
   1567                 bounce_id: 2,
   1568                 bounce_new: true
   1569             }
   1570         );
   1571         // Idempotent registered
   1572         assert_eq!(
   1573             register_bounce_tx_in(
   1574                 &mut db,
   1575                 &TxIn {
   1576                     code: 13,
   1577                     amount,
   1578                     subject: "subject".into(),
   1579                     debtor: payto.clone(),
   1580                     value_date: date,
   1581                     status: TxStatus::Completed
   1582                 },
   1583                 "good reason",
   1584                 &now
   1585             )
   1586             .await
   1587             .expect("bounce"),
   1588             BounceResult {
   1589                 tx_id: 1,
   1590                 tx_new: false,
   1591                 bounce_id: 2,
   1592                 bounce_new: false
   1593             }
   1594         );
   1595 
   1596         // Batch
   1597         assert_eq!(
   1598             db::pending_batch(&mut db, &now).await.unwrap(),
   1599             &[
   1600                 Initiated {
   1601                     id: 1,
   1602                     amount,
   1603                     subject: "bounce: 12".into(),
   1604                     creditor: payto.clone()
   1605                 },
   1606                 Initiated {
   1607                     id: 2,
   1608                     amount,
   1609                     subject: "bounce: 13".into(),
   1610                     creditor: payto
   1611                 }
   1612             ]
   1613         );
   1614     }
   1615 
   1616     #[tokio::test]
   1617     async fn status() {
   1618         let (mut db, _) = setup().await;
   1619 
   1620         // Unknown transfer
   1621         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
   1622             .await
   1623             .unwrap();
   1624         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1625             .await
   1626             .unwrap();
   1627     }
   1628 
   1629     #[tokio::test]
   1630     async fn batch() {
   1631         let (mut db, pool) = setup().await;
   1632         let start = Timestamp::now();
   1633         let magnet_payto =
   1634             magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1635 
   1636         // Empty db
   1637         let pendings = db::pending_batch(&mut db, &start)
   1638             .await
   1639             .expect("pending_batch");
   1640         assert_eq!(pendings.len(), 0);
   1641 
   1642         // Some transfers
   1643         for i in 0..3 {
   1644             make_transfer(
   1645                 &pool,
   1646                 &db::Transfer {
   1647                     request_uid: HashCode::rand(),
   1648                     amount: decimal(format!("{}", i + 1)),
   1649                     exchange_base_url: url("https://exchange.test.com/"),
   1650                     metadata: None,
   1651                     wtid: ShortHashCode::rand(),
   1652                     creditor: magnet_payto.clone(),
   1653                 },
   1654                 &Timestamp::now(),
   1655             )
   1656             .await
   1657             .expect("transfer");
   1658         }
   1659         let pendings = db::pending_batch(&mut db, &start)
   1660             .await
   1661             .expect("pending_batch");
   1662         assert_eq!(pendings.len(), 3);
   1663 
   1664         // Max 100 txs in batch
   1665         for i in 0..100 {
   1666             make_transfer(
   1667                 &pool,
   1668                 &db::Transfer {
   1669                     request_uid: HashCode::rand(),
   1670                     amount: decimal(format!("{}", i + 1)),
   1671                     exchange_base_url: url("https://exchange.test.com/"),
   1672                     metadata: None,
   1673                     wtid: ShortHashCode::rand(),
   1674                     creditor: magnet_payto.clone(),
   1675                 },
   1676                 &Timestamp::now(),
   1677             )
   1678             .await
   1679             .expect("transfer");
   1680         }
   1681         let pendings = db::pending_batch(&mut db, &start)
   1682             .await
   1683             .expect("pending_batch");
   1684         assert_eq!(pendings.len(), 100);
   1685 
   1686         // Skip uploaded
   1687         for i in 0..=10 {
   1688             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1689                 .await
   1690                 .expect("status success");
   1691         }
   1692         let pendings = db::pending_batch(&mut db, &start)
   1693             .await
   1694             .expect("pending_batch");
   1695         assert_eq!(pendings.len(), 93);
   1696 
   1697         // Skip failed
   1698         for i in 0..=10 {
   1699             db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
   1700                 .await
   1701                 .expect("status failure");
   1702         }
   1703         let pendings = db::pending_batch(&mut db, &start)
   1704             .await
   1705             .expect("pending_batch");
   1706         assert_eq!(pendings.len(), 83);
   1707     }
   1708 }