taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (49583B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::Timestamp;
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api_common::{HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_revenue::RevenueIncomingBankTransaction,
     32     api_transfer::{RegistrationRequest, Unregistration},
     33     api_wire::{
     34         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     35         TransferStatus,
     36     },
     37     config::Config,
     38     db::IncomingType,
     39     types::{
     40         amount::{Currency, Decimal},
     41         payto::{PaytoImpl as _, PaytoURI},
     42     },
     43 };
     44 use tokio::sync::watch::{Receiver, Sender};
     45 use url::Url;
     46 
     47 use crate::{
     48     config::parse_db_cfg,
     49     payto::{CyclosAccount, CyclosId, FullCyclosPayto},
     50 };
     51 
     52 const SCHEMA: &str = "cyclos";
     53 
     54 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     55     let db = parse_db_cfg(cfg)?;
     56     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     57     Ok(pool)
     58 }
     59 
     60 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     61     let db_cfg = parse_db_cfg(cfg)?;
     62     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     63     let mut db = pool.acquire().await?;
     64     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?;
     65     Ok(pool)
     66 }
     67 
     68 pub async fn notification_listener(
     69     pool: PgPool,
     70     in_channel: Sender<i64>,
     71     taler_in_channel: Sender<i64>,
     72     out_channel: Sender<i64>,
     73     taler_out_channel: Sender<i64>,
     74 ) -> sqlx::Result<()> {
     75     taler_api::notification::notification_listener!(&pool,
     76         "tx_in" => (row_id: i64) {
     77             in_channel.send_replace(row_id);
     78         },
     79         "taler_in" => (row_id: i64) {
     80             taler_in_channel.send_replace(row_id);
     81         },
     82         "tx_out" => (row_id: i64) {
     83             out_channel.send_replace(row_id);
     84         },
     85         "taler_out" => (row_id: i64) {
     86             taler_out_channel.send_replace(row_id);
     87         }
     88     )
     89 }
     90 
     91 #[derive(Debug, Clone)]
     92 pub struct TxIn {
     93     pub transfer_id: i64,
     94     pub tx_id: Option<i64>,
     95     pub amount: Decimal,
     96     pub subject: String,
     97     pub debtor_id: i64,
     98     pub debtor_name: CompactString,
     99     pub valued_at: Timestamp,
    100 }
    101 
    102 impl Display for TxIn {
    103     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    104         let Self {
    105             transfer_id,
    106             tx_id,
    107             amount,
    108             subject,
    109             valued_at,
    110             debtor_id,
    111             debtor_name,
    112         } = self;
    113         let tx_id = match tx_id {
    114             Some(id) => format_args!(":{}", *id),
    115             None => format_args!(""),
    116         };
    117         write!(
    118             f,
    119             "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'"
    120         )
    121     }
    122 }
    123 
    124 #[derive(Debug, Clone)]
    125 pub struct TxOut {
    126     pub transfer_id: i64,
    127     pub tx_id: Option<i64>,
    128     pub amount: Decimal,
    129     pub subject: String,
    130     pub creditor_id: i64,
    131     pub creditor_name: CompactString,
    132     pub valued_at: Timestamp,
    133 }
    134 
    135 impl Display for TxOut {
    136     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    137         let Self {
    138             transfer_id,
    139             tx_id,
    140             amount,
    141             subject,
    142             creditor_id,
    143             creditor_name,
    144             valued_at,
    145         } = self;
    146         let tx_id = match tx_id {
    147             Some(id) => format_args!(":{}", *id),
    148             None => format_args!(""),
    149         };
    150         write!(
    151             f,
    152             "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    153         )
    154     }
    155 }
    156 
    157 #[derive(Debug, PartialEq, Eq)]
    158 pub struct Initiated {
    159     pub id: i64,
    160     pub amount: Decimal,
    161     pub subject: String,
    162     pub creditor_id: i64,
    163     pub creditor_name: CompactString,
    164 }
    165 
    166 impl Display for Initiated {
    167     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    168         let Self {
    169             id,
    170             amount,
    171             subject,
    172             creditor_id,
    173             creditor_name,
    174         } = self;
    175         write!(
    176             f,
    177             "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    178         )
    179     }
    180 }
    181 
    182 #[derive(Debug, Clone)]
    183 pub struct TxInAdmin {
    184     pub amount: Decimal,
    185     pub subject: String,
    186     pub debtor_id: i64,
    187     pub debtor_name: CompactString,
    188     pub metadata: IncomingSubject,
    189 }
    190 
    191 #[derive(Debug, PartialEq, Eq)]
    192 pub enum AddIncomingResult {
    193     Success {
    194         new: bool,
    195         pending: bool,
    196         row_id: u64,
    197         valued_at: Timestamp,
    198     },
    199     ReservePubReuse,
    200     UnknownMapping,
    201     MappingReuse,
    202 }
    203 
    204 /// Lock the database for worker execution
    205 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    206     sqlx::query("SELECT pg_try_advisory_lock(42)")
    207         .try_map(|r: PgRow| r.try_get(0))
    208         .fetch_one(e)
    209         .await
    210 }
    211 
    212 pub async fn register_tx_in_admin(
    213     db: &PgPool,
    214     tx: &TxInAdmin,
    215     now: &Timestamp,
    216 ) -> sqlx::Result<AddIncomingResult> {
    217     serialized!(
    218         sqlx::query(
    219             "
    220             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    221             FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    222         ",
    223         )
    224         .bind(tx.amount)
    225         .bind(&tx.subject)
    226         .bind(tx.debtor_id)
    227         .bind(&tx.debtor_name)
    228         .bind_timestamp(now)
    229         .bind(tx.metadata.ty())
    230         .bind(tx.metadata.key())
    231        .try_map(|r: PgRow| {
    232             Ok(if r.try_get_flag(0)? {
    233                 AddIncomingResult::ReservePubReuse
    234             } else if r.try_get_flag(1)? {
    235                 AddIncomingResult::MappingReuse
    236             } else if r.try_get_flag(2)? {
    237                 AddIncomingResult::UnknownMapping
    238             } else {
    239                 AddIncomingResult::Success {
    240                     row_id: r.try_get_u64(3)?,
    241                     valued_at: r.try_get_timestamp(4)?,
    242                     new: r.try_get(5)?,
    243                     pending: r.try_get(6)?
    244                 }
    245             })
    246         })
    247         .fetch_one(db)
    248     )
    249 }
    250 
    251 pub async fn register_tx_in(
    252     db: &mut PgConnection,
    253     tx: &TxIn,
    254     subject: &Option<IncomingSubject>,
    255     now: &Timestamp,
    256 ) -> sqlx::Result<AddIncomingResult> {
    257     serialized!(
    258         sqlx::query(
    259             "
    260             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    261             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    262         ",
    263         )
    264         .bind(tx.transfer_id)
    265         .bind(tx.tx_id)
    266         .bind(tx.amount)
    267         .bind(&tx.subject)
    268         .bind(tx.debtor_id)
    269         .bind(&tx.debtor_name)
    270         .bind(tx.valued_at.as_microsecond())
    271         .bind(subject.as_ref().map(|it| it.ty()))
    272         .bind(subject.as_ref().map(|it| it.key()))
    273         .bind(now.as_microsecond())
    274         .try_map(|r: PgRow| {
    275             Ok(if r.try_get_flag(0)? {
    276                 AddIncomingResult::ReservePubReuse
    277             } else if r.try_get_flag(1)? {
    278                 AddIncomingResult::MappingReuse
    279             } else if r.try_get_flag(2)? {
    280                 AddIncomingResult::UnknownMapping
    281             } else {
    282                 AddIncomingResult::Success {
    283                     row_id: r.try_get_u64(3)?,
    284                     valued_at: r.try_get_timestamp(4)?,
    285                     new: r.try_get(5)?,
    286                     pending: r.try_get(6)?
    287                 }
    288             })
    289         })
    290         .fetch_one(&mut *db)
    291     )
    292 }
    293 
    294 #[derive(Debug)]
    295 pub enum TxOutKind {
    296     Simple,
    297     Bounce(i64),
    298     Talerable(OutgoingSubject),
    299 }
    300 
    301 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    302 #[allow(non_camel_case_types)]
    303 #[sqlx(type_name = "register_result")]
    304 pub enum RegisterResult {
    305     /// Already registered
    306     idempotent,
    307     /// Initiated transaction
    308     known,
    309     /// Recovered unknown outgoing transaction
    310     recovered,
    311 }
    312 
    313 #[derive(Debug, PartialEq, Eq)]
    314 pub struct AddOutgoingResult {
    315     pub result: RegisterResult,
    316     pub row_id: i64,
    317 }
    318 
    319 pub async fn register_tx_out(
    320     db: &mut PgConnection,
    321     tx: &TxOut,
    322     kind: &TxOutKind,
    323     now: &Timestamp,
    324 ) -> sqlx::Result<AddOutgoingResult> {
    325     serialized!({
    326         let query = sqlx::query(
    327             "
    328                 SELECT out_result, out_tx_row_id
    329                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
    330             ",
    331         )
    332         .bind(tx.transfer_id)
    333         .bind(tx.tx_id)
    334         .bind(tx.amount)
    335         .bind(&tx.subject)
    336         .bind(tx.creditor_id)
    337         .bind(&tx.creditor_name)
    338         .bind_timestamp(&tx.valued_at);
    339         let query = match kind {
    340             TxOutKind::Simple => query
    341                 .bind(None::<&[u8]>)
    342                 .bind(None::<&str>)
    343                 .bind(None::<&str>)
    344                 .bind(None::<i64>),
    345             TxOutKind::Bounce(bounced) => query
    346                 .bind(None::<&[u8]>)
    347                 .bind(None::<&str>)
    348                 .bind(None::<&str>)
    349                 .bind(*bounced),
    350             TxOutKind::Talerable(subject) => query
    351                 .bind(&subject.wtid)
    352                 .bind(subject.exchange_base_url.as_str())
    353                 .bind(&subject.metadata)
    354                 .bind(None::<i64>),
    355         };
    356         query
    357             .bind_timestamp(now)
    358             .try_map(|r: PgRow| {
    359                 Ok(AddOutgoingResult {
    360                     result: r.try_get(0)?,
    361                     row_id: r.try_get(1)?,
    362                 })
    363             })
    364             .fetch_one(&mut *db)
    365     })
    366 }
    367 
    368 #[derive(Debug, PartialEq, Eq)]
    369 pub enum TransferResult {
    370     Success { id: u64, initiated_at: Timestamp },
    371     RequestUidReuse,
    372     WtidReuse,
    373 }
    374 
    375 #[derive(Debug, Clone)]
    376 pub struct Transfer {
    377     pub request_uid: HashCode,
    378     pub amount: Decimal,
    379     pub exchange_base_url: Url,
    380     pub metadata: Option<CompactString>,
    381     pub wtid: ShortHashCode,
    382     pub creditor_id: i64,
    383     pub creditor_name: CompactString,
    384 }
    385 
    386 pub async fn make_transfer(
    387     db: &PgPool,
    388     tx: &Transfer,
    389     now: &Timestamp,
    390 ) -> sqlx::Result<TransferResult> {
    391     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    392     serialized!(
    393         sqlx::query(
    394             "
    395                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    396                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    397             ",
    398         )
    399         .bind(&tx.request_uid)
    400         .bind(&tx.wtid)
    401         .bind(&subject)
    402         .bind(tx.amount)
    403         .bind(tx.exchange_base_url.as_str())
    404         .bind(&tx.metadata)
    405         .bind(tx.creditor_id)
    406         .bind(&tx.creditor_name)
    407         .bind_timestamp(now)
    408         .try_map(|r: PgRow| {
    409             Ok(if r.try_get_flag(0)? {
    410                 TransferResult::RequestUidReuse
    411             } else if r.try_get_flag(1)? {
    412                 TransferResult::WtidReuse
    413             } else {
    414                 TransferResult::Success {
    415                     id: r.try_get_u64(2)?,
    416                     initiated_at: r.try_get_timestamp(3)?,
    417                 }
    418             })
    419         })
    420         .fetch_one(db)
    421     )
    422 }
    423 
    424 #[derive(Debug, PartialEq, Eq)]
    425 pub struct BounceResult {
    426     pub tx_id: u64,
    427     pub tx_new: bool,
    428 }
    429 
    430 pub async fn register_bounced_tx_in(
    431     db: &mut PgConnection,
    432     tx: &TxIn,
    433     chargeback_id: i64,
    434     reason: &str,
    435     now: &Timestamp,
    436 ) -> sqlx::Result<BounceResult> {
    437     serialized!(
    438         sqlx::query(
    439             "
    440                 SELECT out_tx_row_id, out_tx_new
    441                 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    442             ",
    443         )
    444         .bind(tx.transfer_id)
    445         .bind(tx.tx_id)
    446         .bind(tx.amount)
    447         .bind(&tx.subject)
    448         .bind(tx.debtor_id)
    449         .bind(&tx.debtor_name)
    450         .bind_timestamp(&tx.valued_at)
    451         .bind(chargeback_id)
    452         .bind(reason)
    453         .bind_timestamp(now)
    454         .try_map(|r: PgRow| {
    455             Ok(BounceResult {
    456                 tx_id: r.try_get_u64(0)?,
    457                 tx_new: r.try_get(1)?,
    458             })
    459         })
    460         .fetch_one(&mut *db)
    461     )
    462 }
    463 
    464 pub async fn transfer_page(
    465     db: &PgPool,
    466     status: &Option<TransferState>,
    467     currency: &Currency,
    468     root: &CompactString,
    469     params: &Page,
    470 ) -> sqlx::Result<Vec<TransferListStatus>> {
    471     page(
    472         db,
    473         "initiated_id",
    474         params,
    475         || {
    476             let mut builder = QueryBuilder::new(
    477                 "
    478                     SELECT
    479                         initiated_id,
    480                         status,
    481                         amount,
    482                         credit_account,
    483                         credit_name,
    484                         initiated_at
    485                     FROM transfer
    486                     JOIN initiated USING (initiated_id)
    487                     WHERE
    488                 ",
    489             );
    490             if let Some(status) = status {
    491                 builder.push(" status = ").push_bind(status).push(" AND ");
    492             }
    493             builder
    494         },
    495         |r: PgRow| {
    496             Ok(TransferListStatus {
    497                 row_id: r.try_get_safeu64(0)?,
    498                 status: r.try_get(1)?,
    499                 amount: r.try_get_amount(2, currency)?,
    500                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    501                 timestamp: r.try_get_timestamp(5)?.into(),
    502             })
    503         },
    504     )
    505     .await
    506 }
    507 
    508 pub async fn outgoing_history(
    509     db: &PgPool,
    510     params: &History,
    511     currency: &Currency,
    512     root: &CompactString,
    513     listen: impl FnOnce() -> Receiver<i64>,
    514 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    515     history(
    516         db,
    517         "tx_out_id",
    518         params,
    519         listen,
    520         || {
    521             QueryBuilder::new(
    522                 "
    523                 SELECT
    524                     tx_out_id,
    525                     amount,
    526                     credit_account,
    527                     credit_name,
    528                     valued_at,
    529                     exchange_base_url,
    530                     metadata,
    531                     wtid
    532                 FROM taler_out
    533                 JOIN tx_out USING (tx_out_id)
    534                 WHERE
    535             ",
    536             )
    537         },
    538         |r: PgRow| {
    539             Ok(OutgoingBankTransaction {
    540                 row_id: r.try_get_safeu64(0)?,
    541                 amount: r.try_get_amount(1, currency)?,
    542                 debit_fee: None,
    543                 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?,
    544                 date: r.try_get_timestamp(4)?.into(),
    545                 exchange_base_url: r.try_get_url(5)?,
    546                 metadata: r.try_get(6)?,
    547                 wtid: r.try_get(7)?,
    548             })
    549         },
    550     )
    551     .await
    552 }
    553 
    554 pub async fn incoming_history(
    555     db: &PgPool,
    556     params: &History,
    557     currency: &Currency,
    558     root: &CompactString,
    559     listen: impl FnOnce() -> Receiver<i64>,
    560 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    561     history(
    562         db,
    563         "tx_in_id",
    564         params,
    565         listen,
    566         || {
    567             QueryBuilder::new(
    568                 "
    569                 SELECT
    570                     type,
    571                     tx_in_id,
    572                     amount,
    573                     debit_account,
    574                     debit_name,
    575                     valued_at,
    576                     metadata,
    577                     authorization_pub,
    578                     authorization_sig
    579                 FROM taler_in
    580                 JOIN tx_in USING (tx_in_id)
    581                 WHERE
    582             ",
    583             )
    584         },
    585         |r: PgRow| {
    586             Ok(match r.try_get(0)? {
    587                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    588                     row_id: r.try_get_safeu64(1)?,
    589                     amount: r.try_get_amount(2, currency)?,
    590                     credit_fee: None,
    591                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    592                     date: r.try_get_timestamp(5)?.into(),
    593                     reserve_pub: r.try_get(6)?,
    594                     authorization_pub: r.try_get(7)?,
    595                     authorization_sig: r.try_get(8)?,
    596                 },
    597                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    598                     row_id: r.try_get_safeu64(1)?,
    599                     amount: r.try_get_amount(2, currency)?,
    600                     credit_fee: None,
    601                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    602                     date: r.try_get_timestamp(5)?.into(),
    603                     account_pub: r.try_get(6)?,
    604                     authorization_pub: r.try_get(7)?,
    605                     authorization_sig: r.try_get(8)?,
    606                 },
    607                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    608             })
    609         },
    610     )
    611     .await
    612 }
    613 
    614 pub async fn revenue_history(
    615     db: &PgPool,
    616     params: &History,
    617     currency: &Currency,
    618     root: &CompactString,
    619     listen: impl FnOnce() -> Receiver<i64>,
    620 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    621     history(
    622         db,
    623         "tx_in_id",
    624         params,
    625         listen,
    626         || {
    627             QueryBuilder::new(
    628                 "
    629                 SELECT
    630                     tx_in_id,
    631                     valued_at,
    632                     amount,
    633                     debit_account,
    634                     debit_name,
    635                     subject
    636                 FROM tx_in
    637                 WHERE
    638             ",
    639             )
    640         },
    641         |r: PgRow| {
    642             Ok(RevenueIncomingBankTransaction {
    643                 row_id: r.try_get_safeu64(0)?,
    644                 date: r.try_get_timestamp(1)?.into(),
    645                 amount: r.try_get_amount(2, currency)?,
    646                 credit_fee: None,
    647                 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    648                 subject: r.try_get(5)?,
    649             })
    650         },
    651     )
    652     .await
    653 }
    654 
    655 pub async fn transfer_by_id(
    656     db: &PgPool,
    657     id: u64,
    658     currency: &Currency,
    659     root: &CompactString,
    660 ) -> sqlx::Result<Option<TransferStatus>> {
    661     serialized!(
    662         sqlx::query(
    663             "
    664                 SELECT
    665                     status,
    666                     status_msg,
    667                     amount,
    668                     exchange_base_url,
    669                     metadata,
    670                     wtid,
    671                     credit_account,
    672                     credit_name,
    673                     initiated_at
    674                 FROM transfer
    675                 JOIN initiated USING (initiated_id)
    676                 WHERE initiated_id = $1
    677             ",
    678         )
    679         .bind(id as i64)
    680         .try_map(|r: PgRow| {
    681             Ok(TransferStatus {
    682                 status: r.try_get(0)?,
    683                 status_msg: r.try_get(1)?,
    684                 amount: r.try_get_amount(2, currency)?,
    685                 origin_exchange_url: r.try_get(3)?,
    686                 metadata: r.try_get(4)?,
    687                 wtid: r.try_get(5)?,
    688                 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?,
    689                 timestamp: r.try_get_timestamp(8)?.into(),
    690             })
    691         })
    692         .fetch_optional(db)
    693     )
    694 }
    695 
    696 /** Get a batch of pending initiated transactions not attempted since [start] */
    697 pub async fn pending_batch(
    698     db: &mut PgConnection,
    699     start: &Timestamp,
    700 ) -> sqlx::Result<Vec<Initiated>> {
    701     serialized!(
    702         sqlx::query(
    703             "
    704             SELECT initiated_id, amount, subject, credit_account, credit_name
    705             FROM initiated
    706             WHERE tx_id IS NULL
    707                 AND status='pending'
    708                 AND (last_submitted IS NULL OR last_submitted < $1)
    709             LIMIT 100
    710         ",
    711         )
    712         .bind_timestamp(start)
    713         .try_map(|r: PgRow| {
    714             Ok(Initiated {
    715                 id: r.try_get(0)?,
    716                 amount: r.try_get(1)?,
    717                 subject: r.try_get(2)?,
    718                 creditor_id: r.try_get(3)?,
    719                 creditor_name: r.try_get(4)?,
    720             })
    721         })
    722         .fetch_all(&mut *db)
    723     )
    724 }
    725 
    726 /** Update status of a successful submitted initiated transaction */
    727 pub async fn initiated_submit_success(
    728     db: &mut PgConnection,
    729     initiated_id: i64,
    730     timestamp: &Timestamp,
    731     tx_id: i64,
    732 ) -> sqlx::Result<()> {
    733     serialized!(
    734         sqlx::query(
    735             "
    736                 UPDATE initiated
    737                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    738                 WHERE initiated_id=$3
    739             "
    740         )
    741         .bind_timestamp(timestamp)
    742         .bind(tx_id)
    743         .bind(initiated_id)
    744         .execute(&mut *db)
    745     )?;
    746     Ok(())
    747 }
    748 
    749 /** Update status of a permanently failed initiated transaction */
    750 pub async fn initiated_submit_permanent_failure(
    751     db: &mut PgConnection,
    752     initiated_id: i64,
    753     msg: &str,
    754 ) -> sqlx::Result<()> {
    755     serialized!(
    756         sqlx::query(
    757             "
    758                 UPDATE initiated
    759                 SET status='permanent_failure', status_msg=$1
    760                 WHERE initiated_id=$2
    761             ",
    762         )
    763         .bind(msg)
    764         .bind(initiated_id)
    765         .execute(&mut *db)
    766     )?;
    767     Ok(())
    768 }
    769 
    770 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    771 pub enum ChargebackFailureResult {
    772     Unknown,
    773     Known(u64),
    774     Idempotent(u64),
    775 }
    776 
    777 /** Update status of a charged back initiated transaction */
    778 pub async fn initiated_chargeback_failure(
    779     db: &mut PgConnection,
    780     transfer_id: i64,
    781 ) -> sqlx::Result<ChargebackFailureResult> {
    782     Ok(serialized!(
    783         sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
    784             .bind(transfer_id)
    785             .try_map(|r: PgRow| {
    786                 let id = r.try_get_u64(0)?;
    787                 Ok(if id == 0 {
    788                     ChargebackFailureResult::Unknown
    789                 } else if r.try_get(1)? {
    790                     ChargebackFailureResult::Known(id)
    791                 } else {
    792                     ChargebackFailureResult::Idempotent(id)
    793                 })
    794             })
    795             .fetch_optional(&mut *db)
    796     )?
    797     .unwrap_or(ChargebackFailureResult::Unknown))
    798 }
    799 
    800 /** Get JSON value from KV table */
    801 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    802     db: &mut PgConnection,
    803     key: &str,
    804 ) -> sqlx::Result<Option<T>> {
    805     serialized!(
    806         sqlx::query("SELECT value FROM kv WHERE key=$1")
    807             .bind(key)
    808             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    809             .fetch_optional(&mut *db)
    810     )
    811 }
    812 
    813 /** Set JSON value in KV table */
    814 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    815     serialized!(
    816         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    817             .bind(key)
    818             .bind(sqlx::types::Json(value))
    819             .execute(&mut *db)
    820     )?;
    821     Ok(())
    822 }
    823 
    824 pub enum RegistrationResult {
    825     Success,
    826     ReservePubReuse,
    827 }
    828 
    829 pub async fn transfer_register(
    830     db: &PgPool,
    831     req: &RegistrationRequest,
    832 ) -> sqlx::Result<RegistrationResult> {
    833     let ty: IncomingType = req.r#type.into();
    834     serialized!(
    835         sqlx::query(
    836             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    837         )
    838         .bind(ty)
    839         .bind(&req.account_pub)
    840         .bind(&req.authorization_pub)
    841         .bind(&req.authorization_sig)
    842         .bind(req.recurrent)
    843         .bind_timestamp(&Timestamp::now())
    844         .try_map(|r: PgRow| {
    845             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    846                 RegistrationResult::ReservePubReuse
    847             } else {
    848                 RegistrationResult::Success
    849             })
    850         })
    851         .fetch_one(db)
    852     )
    853 }
    854 
    855 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    856     serialized!(
    857         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)")
    858             .bind(&req.authorization_pub)
    859             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    860             .fetch_one(db)
    861     )
    862 }
    863 
    864 pub trait CyclosTypeHelper {
    865     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    866         &self,
    867         idx: I,
    868         name: I,
    869         root: &CompactString,
    870     ) -> sqlx::Result<FullCyclosPayto>;
    871     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    872         &self,
    873         idx: I,
    874         name: I,
    875         root: &CompactString,
    876     ) -> sqlx::Result<PaytoURI>;
    877 }
    878 
    879 impl CyclosTypeHelper for PgRow {
    880     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    881         &self,
    882         idx: I,
    883         name: I,
    884         root: &CompactString,
    885     ) -> sqlx::Result<FullCyclosPayto> {
    886         let idx = self.try_get(idx)?;
    887         let name = self.try_get(name)?;
    888         Ok(FullCyclosPayto::new(
    889             CyclosAccount {
    890                 id: CyclosId(idx),
    891                 root: root.clone(),
    892             },
    893             name,
    894         ))
    895     }
    896     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    897         &self,
    898         idx: I,
    899         name: I,
    900         root: &CompactString,
    901     ) -> sqlx::Result<PaytoURI> {
    902         let idx = self.try_get(idx)?;
    903         let name = self.try_get(name)?;
    904         Ok(CyclosAccount {
    905             id: CyclosId(idx),
    906             root: root.clone(),
    907         }
    908         .as_full_payto(name))
    909     }
    910 }
    911 
    912 #[cfg(test)]
    913 mod test {
    914     use std::sync::LazyLock;
    915 
    916     use compact_str::CompactString;
    917     use jiff::{Span, Timestamp};
    918     use serde_json::json;
    919     use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow};
    920     use taler_api::{
    921         db::TypeHelper,
    922         notification::dummy_listen,
    923         subject::{IncomingSubject, OutgoingSubject},
    924     };
    925     use taler_common::{
    926         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    927         api_params::{History, Page},
    928         api_wire::TransferState,
    929         types::{
    930             amount::{Currency, decimal},
    931             url,
    932             utils::now_sql_stable_ts,
    933         },
    934     };
    935 
    936     use crate::{
    937         constants::CONFIG_SOURCE,
    938         db::{
    939             self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
    940             Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
    941         },
    942     };
    943 
    944     pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap());
    945     pub const ROOT: CompactString = CompactString::const_new("localhost");
    946 
    947     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    948         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    949     }
    950 
    951     #[tokio::test]
    952     async fn kv() {
    953         let (mut db, _) = setup().await;
    954 
    955         let value = json!({
    956             "name": "Mr Smith",
    957             "no way": 32
    958         });
    959 
    960         assert_eq!(
    961             db::kv_get::<serde_json::Value>(&mut db, "value")
    962                 .await
    963                 .unwrap(),
    964             None
    965         );
    966         db::kv_set(&mut db, "value", &value).await.unwrap();
    967         db::kv_set(&mut db, "value", &value).await.unwrap();
    968         assert_eq!(
    969             db::kv_get::<serde_json::Value>(&mut db, "value")
    970                 .await
    971                 .unwrap(),
    972             Some(value)
    973         );
    974     }
    975 
    976     #[tokio::test]
    977     async fn tx_in() {
    978         let (mut db, pool) = setup().await;
    979 
    980         let mut routine = async |first: &Option<IncomingSubject>,
    981                                  second: &Option<IncomingSubject>| {
    982             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    983                 .try_map(|r: PgRow| r.try_get_u64(0))
    984                 .fetch_one(&mut *db)
    985                 .await
    986                 .unwrap();
    987             let now = now_sql_stable_ts();
    988             let later = now + Span::new().hours(2);
    989             let tx = TxIn {
    990                 transfer_id: now.as_microsecond() as i64,
    991                 tx_id: None,
    992                 amount: decimal("10"),
    993                 subject: "subject".to_owned(),
    994                 debtor_id: 31000163100000000,
    995                 debtor_name: "Name".into(),
    996                 valued_at: now,
    997             };
    998             // Insert
    999             assert_eq!(
   1000                 db::register_tx_in(&mut db, &tx, first, &now)
   1001                     .await
   1002                     .expect("register tx in"),
   1003                 AddIncomingResult::Success {
   1004                     new: true,
   1005                     pending: false,
   1006                     row_id: id,
   1007                     valued_at: now,
   1008                 }
   1009             );
   1010             // Idempotent
   1011             assert_eq!(
   1012                 db::register_tx_in(
   1013                     &mut db,
   1014                     &TxIn {
   1015                         valued_at: later,
   1016                         ..tx.clone()
   1017                     },
   1018                     first,
   1019                     &now
   1020                 )
   1021                 .await
   1022                 .expect("register tx in"),
   1023                 AddIncomingResult::Success {
   1024                     new: false,
   1025                     pending: false,
   1026                     row_id: id,
   1027                     valued_at: now
   1028                 }
   1029             );
   1030             // Many
   1031             assert_eq!(
   1032                 db::register_tx_in(
   1033                     &mut db,
   1034                     &TxIn {
   1035                         transfer_id: later.as_microsecond() as i64,
   1036                         valued_at: later,
   1037                         ..tx
   1038                     },
   1039                     second,
   1040                     &now
   1041                 )
   1042                 .await
   1043                 .expect("register tx in"),
   1044                 AddIncomingResult::Success {
   1045                     new: true,
   1046                     pending: false,
   1047                     row_id: id + 1,
   1048                     valued_at: later
   1049                 }
   1050             );
   1051         };
   1052 
   1053         // Empty db
   1054         assert_eq!(
   1055             db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1056                 .await
   1057                 .unwrap(),
   1058             Vec::new()
   1059         );
   1060         assert_eq!(
   1061             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1062                 .await
   1063                 .unwrap(),
   1064             Vec::new()
   1065         );
   1066 
   1067         // Regular transaction
   1068         routine(&None, &None).await;
   1069 
   1070         // Reserve transaction
   1071         routine(
   1072             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1073             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1074         )
   1075         .await;
   1076 
   1077         // Kyc transaction
   1078         routine(
   1079             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1080             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1081         )
   1082         .await;
   1083 
   1084         // History
   1085         assert_eq!(
   1086             db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1087                 .await
   1088                 .unwrap()
   1089                 .len(),
   1090             6
   1091         );
   1092         assert_eq!(
   1093             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1094                 .await
   1095                 .unwrap()
   1096                 .len(),
   1097             4
   1098         );
   1099     }
   1100 
   1101     #[tokio::test]
   1102     async fn tx_in_admin() {
   1103         let (_, pool) = setup().await;
   1104 
   1105         // Empty db
   1106         assert_eq!(
   1107             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1108                 .await
   1109                 .unwrap(),
   1110             Vec::new()
   1111         );
   1112 
   1113         let now = now_sql_stable_ts();
   1114         let later = now + Span::new().hours(2);
   1115         let tx = TxInAdmin {
   1116             amount: decimal("10"),
   1117             subject: "subject".to_owned(),
   1118             debtor_id: 31000163100000000,
   1119             debtor_name: "Name".into(),
   1120             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1121         };
   1122         // Insert
   1123         assert_eq!(
   1124             db::register_tx_in_admin(&pool, &tx, &now)
   1125                 .await
   1126                 .expect("register tx in"),
   1127             AddIncomingResult::Success {
   1128                 new: true,
   1129                 pending: false,
   1130                 row_id: 1,
   1131                 valued_at: now
   1132             }
   1133         );
   1134         // Idempotent
   1135         assert_eq!(
   1136             db::register_tx_in_admin(&pool, &tx, &later)
   1137                 .await
   1138                 .expect("register tx in"),
   1139             AddIncomingResult::Success {
   1140                 new: false,
   1141                 pending: false,
   1142                 row_id: 1,
   1143                 valued_at: now
   1144             }
   1145         );
   1146         // Many
   1147         assert_eq!(
   1148             db::register_tx_in_admin(
   1149                 &pool,
   1150                 &TxInAdmin {
   1151                     subject: "Other".to_owned(),
   1152                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1153                     ..tx.clone()
   1154                 },
   1155                 &later
   1156             )
   1157             .await
   1158             .expect("register tx in"),
   1159             AddIncomingResult::Success {
   1160                 new: true,
   1161                 pending: false,
   1162                 row_id: 2,
   1163                 valued_at: later
   1164             }
   1165         );
   1166 
   1167         // History
   1168         assert_eq!(
   1169             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1170                 .await
   1171                 .unwrap()
   1172                 .len(),
   1173             2
   1174         );
   1175     }
   1176 
   1177     #[tokio::test]
   1178     async fn tx_out() {
   1179         let (mut db, pool) = setup().await;
   1180 
   1181         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1182             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1183                 .try_map(|r: PgRow| r.try_get(0))
   1184                 .fetch_one(&mut *db)
   1185                 .await
   1186                 .unwrap();
   1187             let now = now_sql_stable_ts();
   1188             let later = now + Span::new().hours(2);
   1189             let tx = TxOut {
   1190                 transfer_id,
   1191                 tx_id: Some(transfer_id),
   1192                 amount: decimal("10"),
   1193                 subject: "subject".to_owned(),
   1194                 creditor_id: 31000163100000000,
   1195                 creditor_name: "Name".into(),
   1196                 valued_at: now,
   1197             };
   1198             assert!(matches!(
   1199                 db::make_transfer(
   1200                     &pool,
   1201                     &Transfer {
   1202                         request_uid: HashCode::rand(),
   1203                         amount: decimal("10"),
   1204                         exchange_base_url: url("https://exchange.test.com/"),
   1205                         metadata: None,
   1206                         wtid: ShortHashCode::rand(),
   1207                         creditor_id: 31000163100000000,
   1208                         creditor_name: "Name".into()
   1209                     },
   1210                     &now
   1211                 )
   1212                 .await
   1213                 .unwrap(),
   1214                 TransferResult::Success { .. }
   1215             ));
   1216             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id)
   1217                 .await
   1218                 .expect("status success");
   1219 
   1220             // Insert
   1221             assert_eq!(
   1222                 db::register_tx_out(&mut db, &tx, first, &now)
   1223                     .await
   1224                     .expect("register tx out"),
   1225                 AddOutgoingResult {
   1226                     result: db::RegisterResult::known,
   1227                     row_id: transfer_id,
   1228                 }
   1229             );
   1230             // Idempotent
   1231             assert_eq!(
   1232                 db::register_tx_out(
   1233                     &mut db,
   1234                     &TxOut {
   1235                         valued_at: later,
   1236                         ..tx.clone()
   1237                     },
   1238                     first,
   1239                     &now
   1240                 )
   1241                 .await
   1242                 .expect("register tx out"),
   1243                 AddOutgoingResult {
   1244                     result: db::RegisterResult::idempotent,
   1245                     row_id: transfer_id,
   1246                 }
   1247             );
   1248             // Recovered
   1249             assert_eq!(
   1250                 db::register_tx_out(
   1251                     &mut db,
   1252                     &TxOut {
   1253                         transfer_id: transfer_id + 1,
   1254                         tx_id: Some(transfer_id + 1),
   1255                         valued_at: later,
   1256                         ..tx.clone()
   1257                     },
   1258                     second,
   1259                     &now
   1260                 )
   1261                 .await
   1262                 .expect("register tx out"),
   1263                 AddOutgoingResult {
   1264                     result: db::RegisterResult::recovered,
   1265                     row_id: transfer_id + 1,
   1266                 }
   1267             );
   1268         };
   1269 
   1270         // Empty db
   1271         assert_eq!(
   1272             db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1273                 .await
   1274                 .unwrap(),
   1275             Vec::new()
   1276         );
   1277 
   1278         // Regular transaction
   1279         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1280 
   1281         // Talerable transaction
   1282         routine(
   1283             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1284             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1285         )
   1286         .await;
   1287 
   1288         // Bounced transaction
   1289         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1290 
   1291         // History
   1292         assert_eq!(
   1293             db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1294                 .await
   1295                 .unwrap()
   1296                 .len(),
   1297             2
   1298         );
   1299     }
   1300 
   1301     // TODO tx out failure
   1302 
   1303     #[tokio::test]
   1304     async fn transfer() {
   1305         let (_, pool) = setup().await;
   1306 
   1307         // Empty db
   1308         assert_eq!(
   1309             db::transfer_by_id(&pool, 0, &CURRENCY, &ROOT)
   1310                 .await
   1311                 .unwrap(),
   1312             None
   1313         );
   1314         assert_eq!(
   1315             db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default())
   1316                 .await
   1317                 .unwrap(),
   1318             Vec::new()
   1319         );
   1320 
   1321         let req = Transfer {
   1322             request_uid: HashCode::rand(),
   1323             amount: decimal("10"),
   1324             exchange_base_url: url("https://exchange.test.com/"),
   1325             metadata: None,
   1326             wtid: ShortHashCode::rand(),
   1327             creditor_id: 31000163100000000,
   1328             creditor_name: "Name".into(),
   1329         };
   1330         let now = now_sql_stable_ts();
   1331         let later = now + Span::new().hours(2);
   1332         // Insert
   1333         assert_eq!(
   1334             db::make_transfer(&pool, &req, &now)
   1335                 .await
   1336                 .expect("transfer"),
   1337             TransferResult::Success {
   1338                 id: 1,
   1339                 initiated_at: now
   1340             }
   1341         );
   1342         // Idempotent
   1343         assert_eq!(
   1344             db::make_transfer(&pool, &req, &later)
   1345                 .await
   1346                 .expect("transfer"),
   1347             TransferResult::Success {
   1348                 id: 1,
   1349                 initiated_at: now
   1350             }
   1351         );
   1352         // Request UID reuse
   1353         assert_eq!(
   1354             db::make_transfer(
   1355                 &pool,
   1356                 &Transfer {
   1357                     wtid: ShortHashCode::rand(),
   1358                     ..req.clone()
   1359                 },
   1360                 &now
   1361             )
   1362             .await
   1363             .expect("transfer"),
   1364             TransferResult::RequestUidReuse
   1365         );
   1366         // wtid reuse
   1367         assert_eq!(
   1368             db::make_transfer(
   1369                 &pool,
   1370                 &Transfer {
   1371                     request_uid: HashCode::rand(),
   1372                     ..req.clone()
   1373                 },
   1374                 &now
   1375             )
   1376             .await
   1377             .expect("transfer"),
   1378             TransferResult::WtidReuse
   1379         );
   1380         // Many
   1381         assert_eq!(
   1382             db::make_transfer(
   1383                 &pool,
   1384                 &Transfer {
   1385                     request_uid: HashCode::rand(),
   1386                     wtid: ShortHashCode::rand(),
   1387                     ..req
   1388                 },
   1389                 &later
   1390             )
   1391             .await
   1392             .expect("transfer"),
   1393             TransferResult::Success {
   1394                 id: 2,
   1395                 initiated_at: later
   1396             }
   1397         );
   1398 
   1399         // Get
   1400         assert!(
   1401             db::transfer_by_id(&pool, 1, &CURRENCY, &ROOT)
   1402                 .await
   1403                 .unwrap()
   1404                 .is_some()
   1405         );
   1406         assert!(
   1407             db::transfer_by_id(&pool, 2, &CURRENCY, &ROOT)
   1408                 .await
   1409                 .unwrap()
   1410                 .is_some()
   1411         );
   1412         assert!(
   1413             db::transfer_by_id(&pool, 3, &CURRENCY, &ROOT)
   1414                 .await
   1415                 .unwrap()
   1416                 .is_none()
   1417         );
   1418         assert_eq!(
   1419             db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default())
   1420                 .await
   1421                 .unwrap()
   1422                 .len(),
   1423             2
   1424         );
   1425     }
   1426 
   1427     #[tokio::test]
   1428     async fn bounce() {
   1429         let (mut db, _) = setup().await;
   1430 
   1431         let amount = decimal("10");
   1432         let now = now_sql_stable_ts();
   1433 
   1434         // Bounce
   1435         assert_eq!(
   1436             db::register_bounced_tx_in(
   1437                 &mut db,
   1438                 &TxIn {
   1439                     transfer_id: 12,
   1440                     tx_id: None,
   1441                     amount,
   1442                     subject: "subject".to_owned(),
   1443                     debtor_id: 31000163100000000,
   1444                     debtor_name: "Name".into(),
   1445                     valued_at: now
   1446                 },
   1447                 22,
   1448                 "good reason",
   1449                 &now
   1450             )
   1451             .await
   1452             .expect("bounce"),
   1453             BounceResult {
   1454                 tx_id: 1,
   1455                 tx_new: true
   1456             }
   1457         );
   1458         // Idempotent
   1459         assert_eq!(
   1460             db::register_bounced_tx_in(
   1461                 &mut db,
   1462                 &TxIn {
   1463                     transfer_id: 12,
   1464                     tx_id: None,
   1465                     amount,
   1466                     subject: "subject".to_owned(),
   1467                     debtor_id: 31000163100000000,
   1468                     debtor_name: "Name".into(),
   1469                     valued_at: now
   1470                 },
   1471                 22,
   1472                 "good reason",
   1473                 &now
   1474             )
   1475             .await
   1476             .expect("bounce"),
   1477             BounceResult {
   1478                 tx_id: 1,
   1479                 tx_new: false
   1480             }
   1481         );
   1482 
   1483         // Many
   1484         assert_eq!(
   1485             db::register_bounced_tx_in(
   1486                 &mut db,
   1487                 &TxIn {
   1488                     transfer_id: 13,
   1489                     tx_id: None,
   1490                     amount,
   1491                     subject: "subject".to_owned(),
   1492                     debtor_id: 31000163100000000,
   1493                     debtor_name: "Name".into(),
   1494                     valued_at: now
   1495                 },
   1496                 23,
   1497                 "good reason",
   1498                 &now
   1499             )
   1500             .await
   1501             .expect("bounce"),
   1502             BounceResult {
   1503                 tx_id: 2,
   1504                 tx_new: true
   1505             }
   1506         );
   1507     }
   1508 
   1509     #[tokio::test]
   1510     async fn status() {
   1511         let (mut db, pool) = setup().await;
   1512 
   1513         let check_status = async |id: u64, status: TransferState, msg: Option<&str>| {
   1514             let transfer = db::transfer_by_id(&pool, id, &CURRENCY, &ROOT)
   1515                 .await
   1516                 .unwrap()
   1517                 .unwrap();
   1518             assert_eq!(
   1519                 (status, msg),
   1520                 (transfer.status, transfer.status_msg.as_deref())
   1521             );
   1522         };
   1523 
   1524         // Unknown transfer
   1525         db::initiated_submit_permanent_failure(&mut db, 1, "msg")
   1526             .await
   1527             .unwrap();
   1528         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1529             .await
   1530             .unwrap();
   1531         assert_eq!(
   1532             db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
   1533             ChargebackFailureResult::Unknown
   1534         );
   1535 
   1536         // Failure
   1537         db::make_transfer(
   1538             &pool,
   1539             &Transfer {
   1540                 request_uid: HashCode::rand(),
   1541                 amount: decimal("1"),
   1542                 exchange_base_url: url("https://exchange.test.com/"),
   1543                 metadata: None,
   1544                 wtid: ShortHashCode::rand(),
   1545                 creditor_id: 31000163100000000,
   1546                 creditor_name: "Name".into(),
   1547             },
   1548             &Timestamp::now(),
   1549         )
   1550         .await
   1551         .expect("transfer");
   1552         check_status(1, TransferState::pending, None).await;
   1553         db::initiated_submit_permanent_failure(&mut db, 1, "error status")
   1554             .await
   1555             .unwrap();
   1556         check_status(1, TransferState::permanent_failure, Some("error status")).await;
   1557 
   1558         // Success
   1559         db::make_transfer(
   1560             &pool,
   1561             &Transfer {
   1562                 request_uid: HashCode::rand(),
   1563                 amount: decimal("1"),
   1564                 exchange_base_url: url("https://exchange.test.com/"),
   1565                 metadata: None,
   1566                 wtid: ShortHashCode::rand(),
   1567                 creditor_id: 31000163100000000,
   1568                 creditor_name: "Name".into(),
   1569             },
   1570             &Timestamp::now(),
   1571         )
   1572         .await
   1573         .expect("transfer");
   1574         check_status(2, TransferState::pending, None).await;
   1575         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1576             .await
   1577             .unwrap();
   1578         check_status(2, TransferState::pending, None).await;
   1579         db::register_tx_out(
   1580             &mut db,
   1581             &TxOut {
   1582                 transfer_id: 5,
   1583                 tx_id: Some(3),
   1584                 amount: decimal("2"),
   1585                 subject: "".to_string(),
   1586                 creditor_id: 31000163100000000,
   1587                 creditor_name: "Name".into(),
   1588                 valued_at: Timestamp::now(),
   1589             },
   1590             &TxOutKind::Simple,
   1591             &Timestamp::now(),
   1592         )
   1593         .await
   1594         .unwrap();
   1595         check_status(2, TransferState::success, None).await;
   1596 
   1597         // Chargeback
   1598         assert_eq!(
   1599             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1600             ChargebackFailureResult::Known(2)
   1601         );
   1602         check_status(2, TransferState::late_failure, Some("charged back")).await;
   1603         assert_eq!(
   1604             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1605             ChargebackFailureResult::Idempotent(2)
   1606         );
   1607     }
   1608 
   1609     #[tokio::test]
   1610     async fn batch() {
   1611         let (mut db, pool) = setup().await;
   1612         let start = Timestamp::now();
   1613 
   1614         // Empty db
   1615         let pendings = db::pending_batch(&mut db, &start)
   1616             .await
   1617             .expect("pending_batch");
   1618         assert_eq!(pendings.len(), 0);
   1619 
   1620         // Some transfers
   1621         for i in 0..3 {
   1622             db::make_transfer(
   1623                 &pool,
   1624                 &Transfer {
   1625                     request_uid: HashCode::rand(),
   1626                     amount: decimal(format!("{}", i + 1)),
   1627                     exchange_base_url: url("https://exchange.test.com/"),
   1628                     metadata: None,
   1629                     wtid: ShortHashCode::rand(),
   1630                     creditor_id: 31000163100000000,
   1631                     creditor_name: "Name".into(),
   1632                 },
   1633                 &Timestamp::now(),
   1634             )
   1635             .await
   1636             .expect("transfer");
   1637         }
   1638         let pendings = db::pending_batch(&mut db, &start)
   1639             .await
   1640             .expect("pending_batch");
   1641         assert_eq!(pendings.len(), 3);
   1642 
   1643         // Max 100 txs in batch
   1644         for i in 0..100 {
   1645             db::make_transfer(
   1646                 &pool,
   1647                 &Transfer {
   1648                     request_uid: HashCode::rand(),
   1649                     amount: decimal(format!("{}", i + 1)),
   1650                     exchange_base_url: url("https://exchange.test.com/"),
   1651                     metadata: None,
   1652                     wtid: ShortHashCode::rand(),
   1653                     creditor_id: 31000163100000000,
   1654                     creditor_name: "Name".into(),
   1655                 },
   1656                 &Timestamp::now(),
   1657             )
   1658             .await
   1659             .expect("transfer");
   1660         }
   1661         let pendings = db::pending_batch(&mut db, &start)
   1662             .await
   1663             .expect("pending_batch");
   1664         assert_eq!(pendings.len(), 100);
   1665 
   1666         // Skip uploaded
   1667         for i in 0..=10 {
   1668             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1669                 .await
   1670                 .expect("status success");
   1671         }
   1672         let pendings = db::pending_batch(&mut db, &start)
   1673             .await
   1674             .expect("pending_batch");
   1675         assert_eq!(pendings.len(), 93);
   1676 
   1677         // Skip failed
   1678         for i in 0..=10 {
   1679             db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
   1680                 .await
   1681                 .expect("status failure");
   1682         }
   1683         let pendings = db::pending_batch(&mut db, &start)
   1684             .await
   1685             .expect("pending_batch");
   1686         assert_eq!(pendings.len(), 83);
   1687     }
   1688 }