taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (50241B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::{Timestamp, civil::Date, tz::TimeZone};
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api::{
     30         HashCode, ShortHashCode,
     31         params::{History, Page},
     32         prepared::{RegistrationRequest, Unregistration},
     33         revenue::RevenueIncomingBankTransaction,
     34         wire::{
     35             IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     36             TransferStatus,
     37         },
     38     },
     39     config::Config,
     40     db::IncomingType,
     41     types::{
     42         amount::{Amount, Decimal},
     43         payto::PaytoImpl as _,
     44     },
     45 };
     46 use tokio::sync::watch::{Receiver, Sender};
     47 use url::Url;
     48 
     49 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURR, magnet_api::types::TxStatus};
     50 
     51 const SCHEMA: &str = "magnet_bank";
     52 
     53 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     54     let db = parse_db_cfg(cfg)?;
     55     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     56     Ok(pool)
     57 }
     58 
     59 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     60     let db_cfg = parse_db_cfg(cfg)?;
     61     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     62     let mut db = pool.acquire().await?;
     63     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
     64     Ok(pool)
     65 }
     66 
     67 pub async fn notification_listener(
     68     pool: PgPool,
     69     in_channel: Sender<i64>,
     70     taler_in_channel: Sender<i64>,
     71     out_channel: Sender<i64>,
     72     taler_out_channel: Sender<i64>,
     73 ) {
     74     taler_api::notification::notification_listener!(&pool,
     75         "tx_in" => (row_id: i64) {
     76             in_channel.send_replace(row_id);
     77         },
     78         "taler_in" => (row_id: i64) {
     79             taler_in_channel.send_replace(row_id);
     80         },
     81         "tx_out" => (row_id: i64) {
     82             out_channel.send_replace(row_id);
     83         },
     84         "taler_out" => (row_id: i64) {
     85             taler_out_channel.send_replace(row_id);
     86         }
     87     )
     88 }
     89 
     90 #[derive(Debug, Clone)]
     91 pub struct TxIn {
     92     pub code: u64,
     93     pub amount: Amount,
     94     pub subject: Box<str>,
     95     pub debtor: FullHuPayto,
     96     pub value_date: Date,
     97     pub status: TxStatus,
     98 }
     99 
    100 impl Display for TxIn {
    101     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    102         let Self {
    103             code,
    104             amount,
    105             subject,
    106             debtor,
    107             value_date,
    108             status,
    109         } = self;
    110         write!(
    111             f,
    112             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    113             debtor.bban(),
    114             debtor.name
    115         )
    116     }
    117 }
    118 
    119 #[derive(Debug, Clone)]
    120 pub struct TxOut {
    121     pub code: u64,
    122     pub amount: Amount,
    123     pub subject: Box<str>,
    124     pub creditor: FullHuPayto,
    125     pub value_date: Date,
    126     pub status: TxStatus,
    127 }
    128 
    129 impl Display for TxOut {
    130     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    131         let Self {
    132             code,
    133             amount,
    134             subject,
    135             creditor,
    136             value_date,
    137             status,
    138         } = self;
    139         write!(
    140             f,
    141             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    142             creditor.bban(),
    143             &creditor.name
    144         )
    145     }
    146 }
    147 
    148 #[derive(Debug, PartialEq, Eq)]
    149 pub struct Initiated {
    150     pub id: u64,
    151     pub amount: Amount,
    152     pub subject: Box<str>,
    153     pub creditor: FullHuPayto,
    154 }
    155 
    156 impl Display for Initiated {
    157     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    158         let Self {
    159             id,
    160             amount,
    161             subject,
    162             creditor,
    163         } = self;
    164         write!(
    165             f,
    166             "{id} {amount} ({} {}) '{subject}'",
    167             creditor.bban(),
    168             &creditor.name
    169         )
    170     }
    171 }
    172 
    173 #[derive(Debug, Clone)]
    174 pub struct TxInAdmin {
    175     pub amount: Amount,
    176     pub subject: String,
    177     pub debtor: FullHuPayto,
    178     pub metadata: IncomingSubject,
    179 }
    180 
    181 /// Lock the database for worker execution
    182 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    183     sqlx::query("SELECT pg_try_advisory_lock(42)")
    184         .try_map(|r: PgRow| r.try_get(0))
    185         .fetch_one(e)
    186         .await
    187 }
    188 
    189 #[derive(Debug, PartialEq, Eq)]
    190 pub enum AddIncomingResult {
    191     Success {
    192         new: bool,
    193         pending: bool,
    194         row_id: u64,
    195         valued_at: Date,
    196     },
    197     ReservePubReuse,
    198     UnknownMapping,
    199     MappingReuse,
    200 }
    201 
    202 pub async fn register_tx_in_admin(
    203     db: &PgPool,
    204     tx: &TxInAdmin,
    205     now: &Timestamp,
    206 ) -> sqlx::Result<AddIncomingResult> {
    207     serialized!(
    208         sqlx::query(
    209             "
    210                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    211                 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    212             ",
    213         )
    214         .bind(tx.amount)
    215         .bind(&tx.subject)
    216         .bind(tx.debtor.iban())
    217         .bind(&tx.debtor.name)
    218         .bind_date(&now.to_zoned(TimeZone::UTC).date())
    219         .bind(tx.metadata.ty())
    220         .bind(tx.metadata.key())
    221         .try_map(|r: PgRow| {
    222             Ok(if r.try_get_flag(0)? {
    223                 AddIncomingResult::ReservePubReuse
    224             } else if r.try_get_flag(1)? {
    225                 AddIncomingResult::MappingReuse
    226             } else if r.try_get_flag(2)? {
    227                 AddIncomingResult::UnknownMapping
    228             } else {
    229                 AddIncomingResult::Success {
    230                     row_id: r.try_get_u64(3)?,
    231                     valued_at: r.try_get_date(4)?,
    232                     new: r.try_get(5)?,
    233                     pending: r.try_get(6)?
    234                 }
    235             })
    236         })
    237         .fetch_one(db)
    238     )
    239 }
    240 
    241 pub async fn register_tx_in(
    242     db: &mut PgConnection,
    243     tx: &TxIn,
    244     subject: &Option<IncomingSubject>,
    245     now: &Timestamp,
    246 ) -> sqlx::Result<AddIncomingResult> {
    247     serialized!(
    248         sqlx::query(
    249             "
    250                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    251                 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9)
    252             ",
    253         )
    254         .bind(tx.code as i64)
    255         .bind(tx.amount)
    256         .bind(&tx.subject)
    257         .bind(tx.debtor.iban())
    258         .bind(&tx.debtor.name)
    259         .bind_date(&tx.value_date)
    260         .bind(subject.as_ref().map(|it| it.ty()))
    261         .bind(subject.as_ref().map(|it| it.key()))
    262         .bind_timestamp(now)
    263         .try_map(|r: PgRow| {
    264             Ok(if r.try_get_flag(0)? {
    265                 AddIncomingResult::ReservePubReuse
    266             } else if r.try_get_flag(1)? {
    267                 AddIncomingResult::MappingReuse
    268             } else if r.try_get_flag(2)? {
    269                 AddIncomingResult::UnknownMapping
    270             } else {
    271                 AddIncomingResult::Success {
    272                     row_id: r.try_get_u64(3)?,
    273                     valued_at: r.try_get_date(4)?,
    274                     new: r.try_get(5)?,
    275                     pending: r.try_get(6)?
    276                 }
    277             })
    278         })
    279         .fetch_one(&mut *db)
    280     )
    281 }
    282 
    283 #[derive(Debug)]
    284 pub enum TxOutKind {
    285     Simple,
    286     Bounce(u32),
    287     Talerable(OutgoingSubject),
    288 }
    289 
    290 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    291 #[allow(non_camel_case_types)]
    292 #[sqlx(type_name = "register_result")]
    293 pub enum RegisterResult {
    294     /// Already registered
    295     idempotent,
    296     /// Initiated transaction
    297     known,
    298     /// Recovered unknown outgoing transaction
    299     recovered,
    300 }
    301 
    302 #[derive(Debug, PartialEq, Eq)]
    303 pub struct AddOutgoingResult {
    304     pub result: RegisterResult,
    305     pub row_id: u64,
    306 }
    307 
    308 pub async fn register_tx_out(
    309     db: &mut PgConnection,
    310     tx: &TxOut,
    311     kind: &TxOutKind,
    312     now: &Timestamp,
    313 ) -> sqlx::Result<AddOutgoingResult> {
    314     serialized!({
    315         let query = sqlx::query(
    316             "
    317                 SELECT out_result, out_tx_row_id
    318                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
    319             ",
    320         )
    321         .bind(tx.code as i64)
    322         .bind(tx.amount)
    323         .bind(&tx.subject)
    324         .bind(tx.creditor.iban())
    325         .bind(&tx.creditor.name)
    326         .bind_date(&tx.value_date);
    327         let query = match kind {
    328             TxOutKind::Simple => query
    329                 .bind(None::<&[u8]>)
    330                 .bind(None::<&str>)
    331                 .bind(None::<&str>)
    332                 .bind(None::<i64>),
    333             TxOutKind::Bounce(bounced) => query
    334                 .bind(None::<&[u8]>)
    335                 .bind(None::<&str>)
    336                 .bind(None::<&str>)
    337                 .bind(*bounced as i64),
    338             TxOutKind::Talerable(subject) => query
    339                 .bind(&subject.wtid)
    340                 .bind(subject.exchange_base_url.as_str())
    341                 .bind(&subject.metadata)
    342                 .bind(None::<i64>),
    343         };
    344         query
    345             .bind_timestamp(now)
    346             .try_map(|r: PgRow| {
    347                 Ok(AddOutgoingResult {
    348                     result: r.try_get(0)?,
    349                     row_id: r.try_get_u64(1)?,
    350                 })
    351             })
    352             .fetch_one(&mut *db)
    353     })
    354 }
    355 
    356 #[derive(Debug, PartialEq, Eq)]
    357 pub struct OutFailureResult {
    358     pub initiated_id: Option<u64>,
    359     pub new: bool,
    360 }
    361 
    362 pub async fn register_tx_out_failure(
    363     db: &mut PgConnection,
    364     code: u64,
    365     bounced: Option<u32>,
    366     now: &Timestamp,
    367 ) -> sqlx::Result<OutFailureResult> {
    368     serialized!(
    369         sqlx::query(
    370             "
    371                 SELECT out_new, out_initiated_id
    372                 FROM register_tx_out_failure($1, $2, $3)
    373             ",
    374         )
    375         .bind(code as i64)
    376         .bind(bounced.map(|i| i as i32))
    377         .bind_timestamp(now)
    378         .try_map(|r: PgRow| {
    379             Ok(OutFailureResult {
    380                 new: r.try_get(0)?,
    381                 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
    382             })
    383         })
    384         .fetch_one(&mut *db)
    385     )
    386 }
    387 
    388 #[derive(Debug, PartialEq, Eq)]
    389 pub enum TransferResult {
    390     Success { id: u64, initiated_at: Timestamp },
    391     RequestUidReuse,
    392     WtidReuse,
    393 }
    394 
    395 #[derive(Debug, Clone)]
    396 pub struct Transfer {
    397     pub request_uid: HashCode,
    398     pub amount: Decimal,
    399     pub exchange_base_url: Url,
    400     pub metadata: Option<CompactString>,
    401     pub wtid: ShortHashCode,
    402     pub creditor: FullHuPayto,
    403 }
    404 
    405 pub async fn make_transfer(
    406     db: &PgPool,
    407     tx: &Transfer,
    408     now: &Timestamp,
    409 ) -> sqlx::Result<TransferResult> {
    410     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    411     serialized!(
    412         sqlx::query(
    413             "
    414                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    415                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    416             ",
    417         )
    418         .bind(&tx.request_uid)
    419         .bind(&tx.wtid)
    420         .bind(&subject)
    421         .bind(tx.amount)
    422         .bind(tx.exchange_base_url.as_str())
    423         .bind(&tx.metadata)
    424         .bind(tx.creditor.iban())
    425         .bind(&tx.creditor.name)
    426         .bind_timestamp(now)
    427         .try_map(|r: PgRow| {
    428             Ok(if r.try_get_flag(0)? {
    429                 TransferResult::RequestUidReuse
    430             } else if r.try_get_flag(1)? {
    431                 TransferResult::WtidReuse
    432             } else {
    433                 TransferResult::Success {
    434                     id: r.try_get_u64(2)?,
    435                     initiated_at: r.try_get_timestamp(3)?,
    436                 }
    437             })
    438         })
    439         .fetch_one(db)
    440     )
    441 }
    442 
    443 #[derive(Debug, PartialEq, Eq)]
    444 pub struct BounceResult {
    445     pub tx_id: u64,
    446     pub tx_new: bool,
    447     pub bounce_id: u64,
    448     pub bounce_new: bool,
    449 }
    450 
    451 pub async fn register_bounce_tx_in(
    452     db: &mut PgConnection,
    453     tx: &TxIn,
    454     reason: &str,
    455     now: &Timestamp,
    456 ) -> sqlx::Result<BounceResult> {
    457     serialized!(
    458         sqlx::query(
    459             "
    460                 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new
    461                 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8)
    462             ",
    463         )
    464         .bind(tx.code as i64)
    465         .bind(tx.amount)
    466         .bind(&tx.subject)
    467         .bind(tx.debtor.iban())
    468         .bind(&tx.debtor.name)
    469         .bind_date(&tx.value_date)
    470         .bind(reason)
    471         .bind_timestamp(now)
    472         .try_map(|r: PgRow| {
    473             Ok(BounceResult {
    474                 tx_id: r.try_get_u64(0)?,
    475                 tx_new: r.try_get(1)?,
    476                 bounce_id: r.try_get_u64(2)?,
    477                 bounce_new: r.try_get(3)?,
    478             })
    479         })
    480         .fetch_one(&mut *db)
    481     )
    482 }
    483 
    484 pub async fn transfer_page(
    485     db: &PgPool,
    486     status: &Option<TransferState>,
    487     params: &Page,
    488 ) -> sqlx::Result<Vec<TransferListStatus>> {
    489     page(
    490         db,
    491         params,
    492         "initiated_id",
    493         || {
    494             let mut builder = QueryBuilder::new(
    495                 "
    496                     SELECT
    497                         initiated_id,
    498                         status,
    499                         amount,
    500                         credit_account,
    501                         credit_name,
    502                         initiated_at
    503                     FROM transfer
    504                     JOIN initiated USING (initiated_id)
    505                     WHERE
    506                 ",
    507             );
    508             if let Some(status) = status {
    509                 builder.push(" status = ").push_bind(status).push(" AND ");
    510             }
    511             builder
    512         },
    513         |r: PgRow| {
    514             Ok(TransferListStatus {
    515                 row_id: r.try_get_u64(0)?,
    516                 status: r.try_get(1)?,
    517                 amount: r.try_get_amount(2, &CURR)?,
    518                 credit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    519                 timestamp: r.try_get_timestamp(5)?.into(),
    520             })
    521         },
    522     )
    523     .await
    524 }
    525 
    526 pub async fn outgoing_history(
    527     db: &PgPool,
    528     params: &History,
    529     listen: impl FnOnce() -> Receiver<i64>,
    530 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    531     history(
    532         db,
    533         "tx_out_id",
    534         params,
    535         listen,
    536         || {
    537             QueryBuilder::new(
    538                 "
    539                 SELECT
    540                     tx_out_id,
    541                     amount,
    542                     credit_account,
    543                     credit_name,
    544                     valued_at,
    545                     exchange_base_url,
    546                     metadata,
    547                     wtid
    548                 FROM taler_out
    549                 JOIN tx_out USING (tx_out_id)
    550                 WHERE
    551             ",
    552             )
    553         },
    554         |r: PgRow| {
    555             Ok(OutgoingBankTransaction {
    556                 row_id: r.try_get_u64(0)?,
    557                 amount: r.try_get_amount(1, &CURR)?,
    558                 debit_fee: None,
    559                 credit_account: r.try_get_iban(2)?.as_full_uri(r.try_get(3)?),
    560                 date: r.try_get_timestamp(4)?.into(),
    561                 exchange_base_url: r.try_get_url(5)?,
    562                 metadata: r.try_get(6)?,
    563                 wtid: r.try_get(7)?,
    564             })
    565         },
    566     )
    567     .await
    568 }
    569 
    570 pub async fn incoming_history(
    571     db: &PgPool,
    572     params: &History,
    573     listen: impl FnOnce() -> Receiver<i64>,
    574 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    575     history(
    576         db,
    577         "tx_in_id",
    578         params,
    579         listen,
    580         || {
    581             QueryBuilder::new(
    582                 "
    583                 SELECT
    584                     type,
    585                     tx_in_id,
    586                     amount,
    587                     debit_account,
    588                     debit_name,
    589                     valued_at,
    590                     metadata,
    591                     authorization_pub,
    592                     authorization_sig
    593                 FROM taler_in
    594                 JOIN tx_in USING (tx_in_id)
    595                 WHERE
    596             ",
    597             )
    598         },
    599         |r: PgRow| {
    600             Ok(match r.try_get(0)? {
    601                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    602                     row_id: r.try_get_u64(1)?,
    603                     amount: r.try_get_amount(2, &CURR)?,
    604                     credit_fee: None,
    605                     debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    606                     date: r.try_get_timestamp(5)?.into(),
    607                     reserve_pub: r.try_get(6)?,
    608                     authorization_pub: r.try_get(7)?,
    609                     authorization_sig: r.try_get(8)?,
    610                 },
    611                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    612                     row_id: r.try_get_u64(1)?,
    613                     amount: r.try_get_amount(2, &CURR)?,
    614                     credit_fee: None,
    615                     debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    616                     date: r.try_get_timestamp(5)?.into(),
    617                     account_pub: r.try_get(6)?,
    618                     authorization_pub: r.try_get(7)?,
    619                     authorization_sig: r.try_get(8)?,
    620                 },
    621                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    622             })
    623         },
    624     )
    625     .await
    626 }
    627 
    628 pub async fn revenue_history(
    629     db: &PgPool,
    630     params: &History,
    631     listen: impl FnOnce() -> Receiver<i64>,
    632 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    633     history(
    634         db,
    635         "tx_in_id",
    636         params,
    637         listen,
    638         || {
    639             QueryBuilder::new(
    640                 "
    641                 SELECT
    642                     tx_in_id,
    643                     valued_at,
    644                     amount,
    645                     debit_account,
    646                     debit_name,
    647                     subject
    648                 FROM tx_in
    649                 WHERE
    650             ",
    651             )
    652         },
    653         |r: PgRow| {
    654             Ok(RevenueIncomingBankTransaction {
    655                 row_id: r.try_get_u64(0)?,
    656                 date: r.try_get_timestamp(1)?.into(),
    657                 amount: r.try_get_amount(2, &CURR)?,
    658                 credit_fee: None,
    659                 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?),
    660                 subject: r.try_get(5)?,
    661             })
    662         },
    663     )
    664     .await
    665 }
    666 
    667 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> {
    668     serialized!(
    669         sqlx::query(
    670             "
    671                 SELECT
    672                     status,
    673                     status_msg,
    674                     amount,
    675                     exchange_base_url,
    676                     metadata,
    677                     wtid,
    678                     credit_account,
    679                     credit_name,
    680                     initiated_at
    681                 FROM transfer
    682                 JOIN initiated USING (initiated_id)
    683                 WHERE initiated_id = $1
    684             ",
    685         )
    686         .bind(id as i64)
    687         .try_map(|r: PgRow| {
    688             Ok(TransferStatus {
    689                 status: r.try_get(0)?,
    690                 status_msg: r.try_get(1)?,
    691                 amount: r.try_get_amount(2, &CURR)?,
    692                 origin_exchange_url: r.try_get(3)?,
    693                 metadata: r.try_get(4)?,
    694                 wtid: r.try_get(5)?,
    695                 credit_account: r.try_get_iban(6)?.as_full_uri(r.try_get(7)?),
    696                 timestamp: r.try_get_timestamp(8)?.into(),
    697             })
    698         })
    699         .fetch_optional(db)
    700     )
    701 }
    702 
    703 /** Get a batch of pending initiated transactions not attempted since [start] */
    704 pub async fn pending_batch(
    705     db: &mut PgConnection,
    706     start: &Timestamp,
    707 ) -> sqlx::Result<Vec<Initiated>> {
    708     serialized!(
    709         sqlx::query(
    710             "
    711                 SELECT initiated_id, amount, subject, credit_account, credit_name
    712                 FROM initiated
    713                 WHERE magnet_code IS NULL
    714                     AND status='pending'
    715                     AND (last_submitted IS NULL OR last_submitted < $1)
    716                 LIMIT 100
    717             ",
    718         )
    719         .bind_timestamp(start)
    720         .try_map(|r: PgRow| {
    721             Ok(Initiated {
    722                 id: r.try_get_u64(0)?,
    723                 amount: r.try_get_amount(1, &CURR)?,
    724                 subject: r.try_get(2)?,
    725                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    726             })
    727         })
    728         .fetch_all(&mut *db)
    729     )
    730 }
    731 
    732 /** Get an initiated transaction matching the given magnet [code] */
    733 pub async fn initiated_by_code(
    734     db: &mut PgConnection,
    735     code: u64,
    736 ) -> sqlx::Result<Option<Initiated>> {
    737     serialized!(
    738         sqlx::query(
    739             "
    740                 SELECT initiated_id, amount, subject, credit_account, credit_name
    741                 FROM initiated
    742                 WHERE magnet_code IS $1
    743             ",
    744         )
    745         .bind(code as i64)
    746         .try_map(|r: PgRow| {
    747             Ok(Initiated {
    748                 id: r.try_get_u64(0)?,
    749                 amount: r.try_get_amount(1, &CURR)?,
    750                 subject: r.try_get(2)?,
    751                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    752             })
    753         })
    754         .fetch_optional(&mut *db)
    755     )
    756 }
    757 
    758 /** Update status of a successful submitted initiated transaction */
    759 pub async fn initiated_submit_success(
    760     db: &mut PgConnection,
    761     id: u64,
    762     timestamp: &Timestamp,
    763     magnet_code: u64,
    764 ) -> sqlx::Result<()> {
    765     serialized!(
    766         sqlx::query(
    767             "
    768                 UPDATE initiated
    769                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
    770                 WHERE initiated_id=$3
    771             "
    772         ).bind_timestamp(timestamp)
    773         .bind(magnet_code as i64)
    774         .bind(id as i64)
    775         .execute(&mut *db)
    776     )?;
    777     Ok(())
    778 }
    779 
    780 /** Update status of a permanently failed initiated transaction */
    781 pub async fn initiated_submit_permanent_failure(
    782     db: &mut PgConnection,
    783     id: u64,
    784     timestamp: &Timestamp,
    785     msg: &str,
    786 ) -> sqlx::Result<()> {
    787     serialized!(
    788         sqlx::query(
    789             "
    790                 UPDATE initiated
    791                 SET status='permanent_failure', status_msg=$2
    792                 WHERE initiated_id=$3
    793             ",
    794         )
    795         .bind_timestamp(timestamp)
    796         .bind(msg)
    797         .bind(id as i64)
    798         .execute(&mut *db)
    799     )?;
    800     Ok(())
    801 }
    802 
    803 /** Check if an initiated transaction exist for a magnet code */
    804 pub async fn initiated_exists_for_code(
    805     db: &mut PgConnection,
    806     code: u64,
    807 ) -> sqlx::Result<Option<u64>> {
    808     serialized!(
    809         sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1")
    810             .bind(code as i64)
    811             .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64))
    812             .fetch_optional(&mut *db)
    813     )
    814 }
    815 
    816 /** Get JSON value from KV table */
    817 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    818     db: &mut PgConnection,
    819     key: &str,
    820 ) -> sqlx::Result<Option<T>> {
    821     serialized!(
    822         sqlx::query("SELECT value FROM kv WHERE key=$1")
    823             .bind(key)
    824             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    825             .fetch_optional(&mut *db)
    826     )
    827 }
    828 
    829 /** Set JSON value in KV table */
    830 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    831     serialized!(
    832         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    833             .bind(key)
    834             .bind(sqlx::types::Json(value))
    835             .execute(&mut *db)
    836     )?;
    837     Ok(())
    838 }
    839 
    840 pub enum RegistrationResult {
    841     Success,
    842     ReservePubReuse,
    843 }
    844 
    845 pub async fn transfer_register(
    846     db: &PgPool,
    847     req: &RegistrationRequest,
    848 ) -> sqlx::Result<RegistrationResult> {
    849     let ty: IncomingType = req.r#type.into();
    850     serialized!(
    851         sqlx::query(
    852             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    853         )
    854         .bind(ty)
    855         .bind(&req.account_pub)
    856         .bind(&req.authorization_pub)
    857         .bind(&req.authorization_sig)
    858         .bind(req.recurrent)
    859         .bind_timestamp(&Timestamp::now())
    860         .try_map(|r: PgRow| {
    861             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    862                 RegistrationResult::ReservePubReuse
    863             } else {
    864                 RegistrationResult::Success
    865             })
    866         })
    867         .fetch_one(db)
    868     )
    869 }
    870 
    871 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    872     serialized!(
    873         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    874             .bind(&req.authorization_pub)
    875             .bind_timestamp(&Timestamp::now())
    876             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    877             .fetch_one(db)
    878     )
    879 }
    880 
    881 #[cfg(test)]
    882 mod test {
    883     use std::assert_matches;
    884 
    885     use jiff::{Span, Timestamp, Zoned};
    886     use serde_json::json;
    887     use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow};
    888     use taler_api::{
    889         db::TypeHelper,
    890         notification::dummy_listen,
    891         subject::{IncomingSubject, OutgoingSubject},
    892     };
    893     use taler_common::{
    894         api::{
    895             EddsaPublicKey, HashCode, ShortHashCode,
    896             params::{History, Page},
    897         },
    898         types::{
    899             amount::{amount, decimal},
    900             url,
    901             utils::now_sql_stable_ts,
    902         },
    903     };
    904 
    905     use super::TxInAdmin;
    906     use crate::{
    907         constants::CONFIG_SOURCE,
    908         db::{
    909             self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult,
    910             TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer,
    911             register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out,
    912         },
    913         magnet_api::types::TxStatus,
    914         magnet_payto,
    915     };
    916 
    917     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    918         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    919     }
    920 
    921     #[tokio::test]
    922     async fn kv() {
    923         let (mut db, _) = setup().await;
    924 
    925         let value = json!({
    926             "name": "Mr Smith",
    927             "no way": 32
    928         });
    929 
    930         assert_eq!(
    931             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    932             None
    933         );
    934         kv_set(&mut db, "value", &value).await.unwrap();
    935         kv_set(&mut db, "value", &value).await.unwrap();
    936         assert_eq!(
    937             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    938             Some(value)
    939         );
    940     }
    941 
    942     #[tokio::test]
    943     async fn tx_in() {
    944         let (mut db, pool) = setup().await;
    945 
    946         let mut routine = async |first: &Option<IncomingSubject>,
    947                                  second: &Option<IncomingSubject>| {
    948             let (id, code) =
    949                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
    950                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
    951                     .fetch_one(&mut *db)
    952                     .await
    953                     .unwrap();
    954             let now = now_sql_stable_ts();
    955             let date = Zoned::now().date();
    956             let later = date.tomorrow().unwrap();
    957             let tx = TxIn {
    958                 code,
    959                 amount: amount("EUR:10"),
    960                 subject: "subject".into(),
    961                 debtor: magnet_payto(
    962                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
    963                 ),
    964                 value_date: date,
    965                 status: TxStatus::Completed,
    966             };
    967             // Insert
    968             assert_eq!(
    969                 register_tx_in(&mut db, &tx, first, &now)
    970                     .await
    971                     .expect("register tx in"),
    972                 AddIncomingResult::Success {
    973                     new: true,
    974                     pending: false,
    975                     row_id: id,
    976                     valued_at: date
    977                 }
    978             );
    979             // Idempotent
    980             assert_eq!(
    981                 register_tx_in(
    982                     &mut db,
    983                     &TxIn {
    984                         value_date: later,
    985                         ..tx.clone()
    986                     },
    987                     first,
    988                     &now
    989                 )
    990                 .await
    991                 .expect("register tx in"),
    992                 AddIncomingResult::Success {
    993                     new: false,
    994                     pending: false,
    995                     row_id: id,
    996                     valued_at: date
    997                 }
    998             );
    999             // Many
   1000             assert_eq!(
   1001                 register_tx_in(
   1002                     &mut db,
   1003                     &TxIn {
   1004                         code: code + 1,
   1005                         value_date: later,
   1006                         ..tx
   1007                     },
   1008                     second,
   1009                     &now
   1010                 )
   1011                 .await
   1012                 .expect("register tx in"),
   1013                 AddIncomingResult::Success {
   1014                     new: true,
   1015                     pending: false,
   1016                     row_id: id + 1,
   1017                     valued_at: later
   1018                 }
   1019             );
   1020         };
   1021 
   1022         // Empty db
   1023         assert_eq!(
   1024             db::revenue_history(&pool, &History::default(), dummy_listen)
   1025                 .await
   1026                 .unwrap(),
   1027             Vec::new()
   1028         );
   1029         assert_eq!(
   1030             db::incoming_history(&pool, &History::default(), dummy_listen)
   1031                 .await
   1032                 .unwrap(),
   1033             Vec::new()
   1034         );
   1035 
   1036         // Regular transaction
   1037         routine(&None, &None).await;
   1038 
   1039         // Reserve transaction
   1040         routine(
   1041             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1042             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1043         )
   1044         .await;
   1045 
   1046         // Kyc transaction
   1047         routine(
   1048             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1049             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1050         )
   1051         .await;
   1052 
   1053         // History
   1054         assert_eq!(
   1055             db::revenue_history(&pool, &History::default(), dummy_listen)
   1056                 .await
   1057                 .unwrap()
   1058                 .len(),
   1059             6
   1060         );
   1061         assert_eq!(
   1062             db::incoming_history(&pool, &History::default(), dummy_listen)
   1063                 .await
   1064                 .unwrap()
   1065                 .len(),
   1066             4
   1067         );
   1068     }
   1069 
   1070     #[tokio::test]
   1071     async fn tx_in_admin() {
   1072         let (_, pool) = setup().await;
   1073 
   1074         // Empty db
   1075         assert_eq!(
   1076             db::incoming_history(&pool, &History::default(), dummy_listen)
   1077                 .await
   1078                 .unwrap(),
   1079             Vec::new()
   1080         );
   1081 
   1082         let now = now_sql_stable_ts();
   1083         let later = now + Span::new().hours(2);
   1084         let date = Zoned::now().date();
   1085         let tx = TxInAdmin {
   1086             amount: amount("EUR:10"),
   1087             subject: "subject".to_owned(),
   1088             debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1089             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1090         };
   1091         // Insert
   1092         assert_eq!(
   1093             register_tx_in_admin(&pool, &tx, &now)
   1094                 .await
   1095                 .expect("register tx in"),
   1096             AddIncomingResult::Success {
   1097                 new: true,
   1098                 pending: false,
   1099                 row_id: 1,
   1100                 valued_at: date
   1101             }
   1102         );
   1103         // Many
   1104         assert_eq!(
   1105             register_tx_in_admin(
   1106                 &pool,
   1107                 &TxInAdmin {
   1108                     subject: "Other".to_owned(),
   1109                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1110                     ..tx.clone()
   1111                 },
   1112                 &later
   1113             )
   1114             .await
   1115             .expect("register tx in"),
   1116             AddIncomingResult::Success {
   1117                 new: true,
   1118                 pending: false,
   1119                 row_id: 2,
   1120                 valued_at: date
   1121             }
   1122         );
   1123 
   1124         // History
   1125         assert_eq!(
   1126             db::incoming_history(&pool, &History::default(), dummy_listen)
   1127                 .await
   1128                 .unwrap()
   1129                 .len(),
   1130             2
   1131         );
   1132     }
   1133 
   1134     #[tokio::test]
   1135     async fn tx_out() {
   1136         let (mut db, pool) = setup().await;
   1137 
   1138         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1139             let (id, code) =
   1140                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
   1141                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
   1142                     .fetch_one(&mut *db)
   1143                     .await
   1144                     .unwrap();
   1145             let now = now_sql_stable_ts();
   1146             let date = Zoned::now().date();
   1147             let later = date.tomorrow().unwrap();
   1148             let tx = TxOut {
   1149                 code,
   1150                 amount: amount("HUF:10"),
   1151                 subject: "subject".into(),
   1152                 creditor: magnet_payto(
   1153                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
   1154                 ),
   1155                 value_date: date,
   1156                 status: TxStatus::Completed,
   1157             };
   1158             assert_matches!(
   1159                 make_transfer(
   1160                     &pool,
   1161                     &db::Transfer {
   1162                         request_uid: HashCode::rand(),
   1163                         amount: decimal("10"),
   1164                         exchange_base_url: url("https://exchange.test.com/"),
   1165                         metadata: None,
   1166                         wtid: ShortHashCode::rand(),
   1167                         creditor: tx.creditor.clone()
   1168                     },
   1169                     &now
   1170                 )
   1171                 .await
   1172                 .unwrap(),
   1173                 TransferResult::Success { .. }
   1174             );
   1175             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code)
   1176                 .await
   1177                 .expect("status success");
   1178 
   1179             // Insert
   1180             assert_eq!(
   1181                 register_tx_out(&mut db, &tx, first, &now)
   1182                     .await
   1183                     .expect("register tx out"),
   1184                 AddOutgoingResult {
   1185                     result: db::RegisterResult::known,
   1186                     row_id: id,
   1187                 }
   1188             );
   1189             // Idempotent
   1190             assert_eq!(
   1191                 register_tx_out(
   1192                     &mut db,
   1193                     &TxOut {
   1194                         value_date: later,
   1195                         ..tx.clone()
   1196                     },
   1197                     first,
   1198                     &now
   1199                 )
   1200                 .await
   1201                 .expect("register tx out"),
   1202                 AddOutgoingResult {
   1203                     result: db::RegisterResult::idempotent,
   1204                     row_id: id,
   1205                 }
   1206             );
   1207             // Recovered
   1208             assert_eq!(
   1209                 register_tx_out(
   1210                     &mut db,
   1211                     &TxOut {
   1212                         code: code + 1,
   1213                         value_date: later,
   1214                         ..tx.clone()
   1215                     },
   1216                     second,
   1217                     &now
   1218                 )
   1219                 .await
   1220                 .expect("register tx out"),
   1221                 AddOutgoingResult {
   1222                     result: db::RegisterResult::recovered,
   1223                     row_id: id + 1,
   1224                 }
   1225             );
   1226         };
   1227 
   1228         // Empty db
   1229         assert_eq!(
   1230             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1231                 .await
   1232                 .unwrap(),
   1233             Vec::new()
   1234         );
   1235 
   1236         // Regular transaction
   1237         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1238 
   1239         // Talerable transaction
   1240         routine(
   1241             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1242             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1243         )
   1244         .await;
   1245 
   1246         // Bounced transaction
   1247         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1248 
   1249         // History
   1250         assert_eq!(
   1251             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1252                 .await
   1253                 .unwrap()
   1254                 .len(),
   1255             2
   1256         );
   1257     }
   1258 
   1259     #[tokio::test]
   1260     async fn tx_out_failure() {
   1261         let (mut db, pool) = setup().await;
   1262 
   1263         let now = now_sql_stable_ts();
   1264 
   1265         // Unknown
   1266         assert_eq!(
   1267             db::register_tx_out_failure(&mut db, 42, None, &now)
   1268                 .await
   1269                 .unwrap(),
   1270             OutFailureResult {
   1271                 initiated_id: None,
   1272                 new: false
   1273             }
   1274         );
   1275         assert_eq!(
   1276             db::register_tx_out_failure(&mut db, 42, Some(12), &now)
   1277                 .await
   1278                 .unwrap(),
   1279             OutFailureResult {
   1280                 initiated_id: None,
   1281                 new: false
   1282             }
   1283         );
   1284 
   1285         // Initiated
   1286         let req = db::Transfer {
   1287             request_uid: HashCode::rand(),
   1288             amount: decimal("10"),
   1289             exchange_base_url: url("https://exchange.test.com/"),
   1290             metadata: None,
   1291             wtid: ShortHashCode::rand(),
   1292             creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1293         };
   1294         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1295         assert_eq!(
   1296             make_transfer(&pool, &req, &now).await.unwrap(),
   1297             TransferResult::Success {
   1298                 id: 1,
   1299                 initiated_at: now
   1300             }
   1301         );
   1302         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34)
   1303             .await
   1304             .expect("status success");
   1305         assert_eq!(
   1306             db::register_tx_out_failure(&mut db, 34, None, &now)
   1307                 .await
   1308                 .unwrap(),
   1309             OutFailureResult {
   1310                 initiated_id: Some(1),
   1311                 new: true
   1312             }
   1313         );
   1314         assert_eq!(
   1315             db::register_tx_out_failure(&mut db, 34, None, &now)
   1316                 .await
   1317                 .unwrap(),
   1318             OutFailureResult {
   1319                 initiated_id: Some(1),
   1320                 new: false
   1321             }
   1322         );
   1323 
   1324         // Recovered bounce
   1325         let tx = TxIn {
   1326             code: 12,
   1327             amount: amount("HUF:11"),
   1328             subject: "malformed transaction".into(),
   1329             debtor: payto,
   1330             value_date: Zoned::now().date(),
   1331             status: TxStatus::Completed,
   1332         };
   1333         assert_eq!(
   1334             db::register_bounce_tx_in(&mut db, &tx, "no reason", &now)
   1335                 .await
   1336                 .unwrap(),
   1337             BounceResult {
   1338                 tx_id: 1,
   1339                 tx_new: true,
   1340                 bounce_id: 2,
   1341                 bounce_new: true
   1342             }
   1343         );
   1344         assert_eq!(
   1345             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1346                 .await
   1347                 .unwrap(),
   1348             OutFailureResult {
   1349                 initiated_id: Some(2),
   1350                 new: true
   1351             }
   1352         );
   1353         assert_eq!(
   1354             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1355                 .await
   1356                 .unwrap(),
   1357             OutFailureResult {
   1358                 initiated_id: Some(2),
   1359                 new: false
   1360             }
   1361         );
   1362     }
   1363 
   1364     #[tokio::test]
   1365     async fn transfer() {
   1366         let (_, pool) = setup().await;
   1367 
   1368         // Empty db
   1369         assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None);
   1370         assert_eq!(
   1371             db::transfer_page(&pool, &None, &Page::default())
   1372                 .await
   1373                 .unwrap(),
   1374             Vec::new()
   1375         );
   1376 
   1377         let req = db::Transfer {
   1378             request_uid: HashCode::rand(),
   1379             amount: decimal("10"),
   1380             exchange_base_url: url("https://exchange.test.com/"),
   1381             metadata: None,
   1382             wtid: ShortHashCode::rand(),
   1383             creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1384         };
   1385         let now = now_sql_stable_ts();
   1386         let later = now + Span::new().hours(2);
   1387         // Insert
   1388         assert_eq!(
   1389             make_transfer(&pool, &req, &now).await.expect("transfer"),
   1390             TransferResult::Success {
   1391                 id: 1,
   1392                 initiated_at: now
   1393             }
   1394         );
   1395         // Idempotent
   1396         assert_eq!(
   1397             make_transfer(&pool, &req, &later).await.expect("transfer"),
   1398             TransferResult::Success {
   1399                 id: 1,
   1400                 initiated_at: now
   1401             }
   1402         );
   1403         // Request UID reuse
   1404         assert_eq!(
   1405             make_transfer(
   1406                 &pool,
   1407                 &db::Transfer {
   1408                     wtid: ShortHashCode::rand(),
   1409                     ..req.clone()
   1410                 },
   1411                 &now
   1412             )
   1413             .await
   1414             .expect("transfer"),
   1415             TransferResult::RequestUidReuse
   1416         );
   1417         // wtid reuse
   1418         assert_eq!(
   1419             make_transfer(
   1420                 &pool,
   1421                 &db::Transfer {
   1422                     request_uid: HashCode::rand(),
   1423                     ..req.clone()
   1424                 },
   1425                 &now
   1426             )
   1427             .await
   1428             .expect("transfer"),
   1429             TransferResult::WtidReuse
   1430         );
   1431         // Many
   1432         assert_eq!(
   1433             make_transfer(
   1434                 &pool,
   1435                 &db::Transfer {
   1436                     request_uid: HashCode::rand(),
   1437                     wtid: ShortHashCode::rand(),
   1438                     ..req
   1439                 },
   1440                 &later
   1441             )
   1442             .await
   1443             .expect("transfer"),
   1444             TransferResult::Success {
   1445                 id: 2,
   1446                 initiated_at: later
   1447             }
   1448         );
   1449 
   1450         // Get
   1451         assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some());
   1452         assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some());
   1453         assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none());
   1454         assert_eq!(
   1455             db::transfer_page(&pool, &None, &Page::default())
   1456                 .await
   1457                 .unwrap()
   1458                 .len(),
   1459             2
   1460         );
   1461     }
   1462 
   1463     #[tokio::test]
   1464     async fn bounce() {
   1465         let (mut db, _) = setup().await;
   1466 
   1467         let amount = amount("HUF:10");
   1468         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1469         let now = now_sql_stable_ts();
   1470         let date = Zoned::now().date();
   1471 
   1472         // Empty db
   1473         assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty());
   1474 
   1475         // Insert
   1476         assert_eq!(
   1477             register_tx_in(
   1478                 &mut db,
   1479                 &TxIn {
   1480                     code: 13,
   1481                     amount,
   1482                     subject: "subject".into(),
   1483                     debtor: payto.clone(),
   1484                     value_date: date,
   1485                     status: TxStatus::Completed
   1486                 },
   1487                 &None,
   1488                 &now
   1489             )
   1490             .await
   1491             .expect("register tx in"),
   1492             AddIncomingResult::Success {
   1493                 new: true,
   1494                 pending: false,
   1495                 row_id: 1,
   1496                 valued_at: date
   1497             }
   1498         );
   1499 
   1500         // Bounce
   1501         assert_eq!(
   1502             register_bounce_tx_in(
   1503                 &mut db,
   1504                 &TxIn {
   1505                     code: 12,
   1506                     amount,
   1507                     subject: "subject".into(),
   1508                     debtor: payto.clone(),
   1509                     value_date: date,
   1510                     status: TxStatus::Completed
   1511                 },
   1512                 "good reason",
   1513                 &now
   1514             )
   1515             .await
   1516             .expect("bounce"),
   1517             BounceResult {
   1518                 tx_id: 2,
   1519                 tx_new: true,
   1520                 bounce_id: 1,
   1521                 bounce_new: true
   1522             }
   1523         );
   1524         // Idempotent
   1525         assert_eq!(
   1526             register_bounce_tx_in(
   1527                 &mut db,
   1528                 &TxIn {
   1529                     code: 12,
   1530                     amount,
   1531                     subject: "subject".into(),
   1532                     debtor: payto.clone(),
   1533                     value_date: date,
   1534                     status: TxStatus::Completed
   1535                 },
   1536                 "good reason",
   1537                 &now
   1538             )
   1539             .await
   1540             .expect("bounce"),
   1541             BounceResult {
   1542                 tx_id: 2,
   1543                 tx_new: false,
   1544                 bounce_id: 1,
   1545                 bounce_new: false
   1546             }
   1547         );
   1548 
   1549         // Bounce registered
   1550         assert_eq!(
   1551             register_bounce_tx_in(
   1552                 &mut db,
   1553                 &TxIn {
   1554                     code: 13,
   1555                     amount,
   1556                     subject: "subject".into(),
   1557                     debtor: payto.clone(),
   1558                     value_date: date,
   1559                     status: TxStatus::Completed
   1560                 },
   1561                 "good reason",
   1562                 &now
   1563             )
   1564             .await
   1565             .expect("bounce"),
   1566             BounceResult {
   1567                 tx_id: 1,
   1568                 tx_new: false,
   1569                 bounce_id: 2,
   1570                 bounce_new: true
   1571             }
   1572         );
   1573         // Idempotent registered
   1574         assert_eq!(
   1575             register_bounce_tx_in(
   1576                 &mut db,
   1577                 &TxIn {
   1578                     code: 13,
   1579                     amount,
   1580                     subject: "subject".into(),
   1581                     debtor: payto.clone(),
   1582                     value_date: date,
   1583                     status: TxStatus::Completed
   1584                 },
   1585                 "good reason",
   1586                 &now
   1587             )
   1588             .await
   1589             .expect("bounce"),
   1590             BounceResult {
   1591                 tx_id: 1,
   1592                 tx_new: false,
   1593                 bounce_id: 2,
   1594                 bounce_new: false
   1595             }
   1596         );
   1597 
   1598         // Batch
   1599         assert_eq!(
   1600             db::pending_batch(&mut db, &now).await.unwrap(),
   1601             &[
   1602                 Initiated {
   1603                     id: 1,
   1604                     amount,
   1605                     subject: "bounce: 12".into(),
   1606                     creditor: payto.clone()
   1607                 },
   1608                 Initiated {
   1609                     id: 2,
   1610                     amount,
   1611                     subject: "bounce: 13".into(),
   1612                     creditor: payto
   1613                 }
   1614             ]
   1615         );
   1616     }
   1617 
   1618     #[tokio::test]
   1619     async fn status() {
   1620         let (mut db, _) = setup().await;
   1621 
   1622         // Unknown transfer
   1623         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
   1624             .await
   1625             .unwrap();
   1626         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1627             .await
   1628             .unwrap();
   1629     }
   1630 
   1631     #[tokio::test]
   1632     async fn batch() {
   1633         let (mut db, pool) = setup().await;
   1634         let start = Timestamp::now();
   1635         let magnet_payto =
   1636             magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1637 
   1638         // Empty db
   1639         let pendings = db::pending_batch(&mut db, &start)
   1640             .await
   1641             .expect("pending_batch");
   1642         assert_eq!(pendings.len(), 0);
   1643 
   1644         // Some transfers
   1645         for i in 0..3 {
   1646             make_transfer(
   1647                 &pool,
   1648                 &db::Transfer {
   1649                     request_uid: HashCode::rand(),
   1650                     amount: decimal(format!("{}", i + 1)),
   1651                     exchange_base_url: url("https://exchange.test.com/"),
   1652                     metadata: None,
   1653                     wtid: ShortHashCode::rand(),
   1654                     creditor: magnet_payto.clone(),
   1655                 },
   1656                 &Timestamp::now(),
   1657             )
   1658             .await
   1659             .expect("transfer");
   1660         }
   1661         let pendings = db::pending_batch(&mut db, &start)
   1662             .await
   1663             .expect("pending_batch");
   1664         assert_eq!(pendings.len(), 3);
   1665 
   1666         // Max 100 txs in batch
   1667         for i in 0..100 {
   1668             make_transfer(
   1669                 &pool,
   1670                 &db::Transfer {
   1671                     request_uid: HashCode::rand(),
   1672                     amount: decimal(format!("{}", i + 1)),
   1673                     exchange_base_url: url("https://exchange.test.com/"),
   1674                     metadata: None,
   1675                     wtid: ShortHashCode::rand(),
   1676                     creditor: magnet_payto.clone(),
   1677                 },
   1678                 &Timestamp::now(),
   1679             )
   1680             .await
   1681             .expect("transfer");
   1682         }
   1683         let pendings = db::pending_batch(&mut db, &start)
   1684             .await
   1685             .expect("pending_batch");
   1686         assert_eq!(pendings.len(), 100);
   1687 
   1688         // Skip uploaded
   1689         for i in 0..=10 {
   1690             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1691                 .await
   1692                 .expect("status success");
   1693         }
   1694         let pendings = db::pending_batch(&mut db, &start)
   1695             .await
   1696             .expect("pending_batch");
   1697         assert_eq!(pendings.len(), 93);
   1698 
   1699         // Skip failed
   1700         for i in 0..=10 {
   1701             db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
   1702                 .await
   1703                 .expect("status failure");
   1704         }
   1705         let pendings = db::pending_batch(&mut db, &start)
   1706             .await
   1707             .expect("pending_batch");
   1708         assert_eq!(pendings.len(), 83);
   1709     }
   1710 }