taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (49129B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::Timestamp;
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api::{
     30         HashCode, ShortHashCode,
     31         params::{History, Page},
     32         prepared::{RegistrationRequest, Unregistration},
     33         revenue::RevenueIncomingBankTransaction,
     34         wire::{
     35             IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     36             TransferStatus,
     37         },
     38     },
     39     config::Config,
     40     db::IncomingType,
     41     types::{
     42         amount::{Currency, Decimal},
     43         payto::{PaytoImpl as _, PaytoURI},
     44     },
     45 };
     46 use tokio::sync::watch::{Receiver, Sender};
     47 use url::Url;
     48 
     49 use crate::{
     50     config::parse_db_cfg,
     51     payto::{CyclosAccount, CyclosId, FullCyclosPayto},
     52 };
     53 
     54 const SCHEMA: &str = "cyclos";
     55 
     56 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     57     let db = parse_db_cfg(cfg)?;
     58     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     59     Ok(pool)
     60 }
     61 
     62 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     63     let db_cfg = parse_db_cfg(cfg)?;
     64     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     65     let mut db = pool.acquire().await?;
     66     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?;
     67     Ok(pool)
     68 }
     69 
     70 pub async fn notification_listener(
     71     pool: PgPool,
     72     in_channel: Sender<i64>,
     73     taler_in_channel: Sender<i64>,
     74     out_channel: Sender<i64>,
     75     taler_out_channel: Sender<i64>,
     76 ) {
     77     taler_api::notification::notification_listener!(&pool,
     78         "tx_in" => (row_id: i64) {
     79             in_channel.send_replace(row_id);
     80         },
     81         "taler_in" => (row_id: i64) {
     82             taler_in_channel.send_replace(row_id);
     83         },
     84         "tx_out" => (row_id: i64) {
     85             out_channel.send_replace(row_id);
     86         },
     87         "taler_out" => (row_id: i64) {
     88             taler_out_channel.send_replace(row_id);
     89         }
     90     )
     91 }
     92 
     93 #[derive(Debug, Clone)]
     94 pub struct TxIn {
     95     pub transfer_id: i64,
     96     pub tx_id: Option<i64>,
     97     pub amount: Decimal,
     98     pub subject: String,
     99     pub debtor_id: i64,
    100     pub debtor_name: CompactString,
    101     pub valued_at: Timestamp,
    102 }
    103 
    104 impl Display for TxIn {
    105     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    106         let Self {
    107             transfer_id,
    108             tx_id,
    109             amount,
    110             subject,
    111             valued_at,
    112             debtor_id,
    113             debtor_name,
    114         } = self;
    115         let tx_id = match tx_id {
    116             Some(id) => format_args!(":{}", *id),
    117             None => format_args!(""),
    118         };
    119         write!(
    120             f,
    121             "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'"
    122         )
    123     }
    124 }
    125 
    126 #[derive(Debug, Clone)]
    127 pub struct TxOut {
    128     pub transfer_id: i64,
    129     pub tx_id: Option<i64>,
    130     pub amount: Decimal,
    131     pub subject: String,
    132     pub creditor_id: i64,
    133     pub creditor_name: CompactString,
    134     pub valued_at: Timestamp,
    135 }
    136 
    137 impl Display for TxOut {
    138     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    139         let Self {
    140             transfer_id,
    141             tx_id,
    142             amount,
    143             subject,
    144             creditor_id,
    145             creditor_name,
    146             valued_at,
    147         } = self;
    148         let tx_id = match tx_id {
    149             Some(id) => format_args!(":{}", *id),
    150             None => format_args!(""),
    151         };
    152         write!(
    153             f,
    154             "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    155         )
    156     }
    157 }
    158 
    159 #[derive(Debug, PartialEq, Eq)]
    160 pub struct Initiated {
    161     pub id: i64,
    162     pub amount: Decimal,
    163     pub subject: String,
    164     pub creditor_id: i64,
    165     pub creditor_name: CompactString,
    166 }
    167 
    168 impl Display for Initiated {
    169     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    170         let Self {
    171             id,
    172             amount,
    173             subject,
    174             creditor_id,
    175             creditor_name,
    176         } = self;
    177         write!(
    178             f,
    179             "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    180         )
    181     }
    182 }
    183 
    184 #[derive(Debug, Clone)]
    185 pub struct TxInAdmin {
    186     pub amount: Decimal,
    187     pub subject: String,
    188     pub debtor_id: i64,
    189     pub debtor_name: CompactString,
    190     pub metadata: IncomingSubject,
    191 }
    192 
    193 #[derive(Debug, PartialEq, Eq)]
    194 pub enum AddIncomingResult {
    195     Success {
    196         new: bool,
    197         pending: bool,
    198         row_id: u64,
    199         valued_at: Timestamp,
    200     },
    201     ReservePubReuse,
    202     UnknownMapping,
    203     MappingReuse,
    204 }
    205 
    206 /// Lock the database for worker execution
    207 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    208     sqlx::query("SELECT pg_try_advisory_lock(42)")
    209         .try_map(|r: PgRow| r.try_get(0))
    210         .fetch_one(e)
    211         .await
    212 }
    213 
    214 pub async fn register_tx_in_admin(
    215     db: &PgPool,
    216     tx: &TxInAdmin,
    217     now: &Timestamp,
    218 ) -> sqlx::Result<AddIncomingResult> {
    219     serialized!(
    220         sqlx::query(
    221             "
    222             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    223             FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    224         ",
    225         )
    226         .bind(tx.amount)
    227         .bind(&tx.subject)
    228         .bind(tx.debtor_id)
    229         .bind(&tx.debtor_name)
    230         .bind_timestamp(now)
    231         .bind(tx.metadata.ty())
    232         .bind(tx.metadata.key())
    233        .try_map(|r: PgRow| {
    234             Ok(if r.try_get_flag(0)? {
    235                 AddIncomingResult::ReservePubReuse
    236             } else if r.try_get_flag(1)? {
    237                 AddIncomingResult::MappingReuse
    238             } else if r.try_get_flag(2)? {
    239                 AddIncomingResult::UnknownMapping
    240             } else {
    241                 AddIncomingResult::Success {
    242                     row_id: r.try_get_u64(3)?,
    243                     valued_at: r.try_get_timestamp(4)?,
    244                     new: r.try_get(5)?,
    245                     pending: r.try_get(6)?
    246                 }
    247             })
    248         })
    249         .fetch_one(db)
    250     )
    251 }
    252 
    253 pub async fn register_tx_in(
    254     db: &mut PgConnection,
    255     tx: &TxIn,
    256     subject: &Option<IncomingSubject>,
    257     now: &Timestamp,
    258 ) -> sqlx::Result<AddIncomingResult> {
    259     serialized!(
    260         sqlx::query(
    261             "
    262             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    263             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    264         ",
    265         )
    266         .bind(tx.transfer_id)
    267         .bind(tx.tx_id)
    268         .bind(tx.amount)
    269         .bind(&tx.subject)
    270         .bind(tx.debtor_id)
    271         .bind(&tx.debtor_name)
    272         .bind(tx.valued_at.as_microsecond())
    273         .bind(subject.as_ref().map(|it| it.ty()))
    274         .bind(subject.as_ref().map(|it| it.key()))
    275         .bind(now.as_microsecond())
    276         .try_map(|r: PgRow| {
    277             Ok(if r.try_get_flag(0)? {
    278                 AddIncomingResult::ReservePubReuse
    279             } else if r.try_get_flag(1)? {
    280                 AddIncomingResult::MappingReuse
    281             } else if r.try_get_flag(2)? {
    282                 AddIncomingResult::UnknownMapping
    283             } else {
    284                 AddIncomingResult::Success {
    285                     row_id: r.try_get_u64(3)?,
    286                     valued_at: r.try_get_timestamp(4)?,
    287                     new: r.try_get(5)?,
    288                     pending: r.try_get(6)?
    289                 }
    290             })
    291         })
    292         .fetch_one(&mut *db)
    293     )
    294 }
    295 
    296 #[derive(Debug)]
    297 pub enum TxOutKind {
    298     Simple,
    299     Bounce(i64),
    300     Talerable(OutgoingSubject),
    301 }
    302 
    303 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    304 #[allow(non_camel_case_types)]
    305 #[sqlx(type_name = "register_result")]
    306 pub enum RegisterResult {
    307     /// Already registered
    308     idempotent,
    309     /// Initiated transaction
    310     known,
    311     /// Recovered unknown outgoing transaction
    312     recovered,
    313 }
    314 
    315 #[derive(Debug, PartialEq, Eq)]
    316 pub struct AddOutgoingResult {
    317     pub result: RegisterResult,
    318     pub row_id: i64,
    319 }
    320 
    321 pub async fn register_tx_out(
    322     db: &mut PgConnection,
    323     tx: &TxOut,
    324     kind: &TxOutKind,
    325     now: &Timestamp,
    326 ) -> sqlx::Result<AddOutgoingResult> {
    327     serialized!({
    328         let query = sqlx::query(
    329             "
    330                 SELECT out_result, out_tx_row_id
    331                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
    332             ",
    333         )
    334         .bind(tx.transfer_id)
    335         .bind(tx.tx_id)
    336         .bind(tx.amount)
    337         .bind(&tx.subject)
    338         .bind(tx.creditor_id)
    339         .bind(&tx.creditor_name)
    340         .bind_timestamp(&tx.valued_at);
    341         let query = match kind {
    342             TxOutKind::Simple => query
    343                 .bind(None::<&[u8]>)
    344                 .bind(None::<&str>)
    345                 .bind(None::<&str>)
    346                 .bind(None::<i64>),
    347             TxOutKind::Bounce(bounced) => query
    348                 .bind(None::<&[u8]>)
    349                 .bind(None::<&str>)
    350                 .bind(None::<&str>)
    351                 .bind(*bounced),
    352             TxOutKind::Talerable(subject) => query
    353                 .bind(&subject.wtid)
    354                 .bind(subject.exchange_base_url.as_str())
    355                 .bind(&subject.metadata)
    356                 .bind(None::<i64>),
    357         };
    358         query
    359             .bind_timestamp(now)
    360             .try_map(|r: PgRow| {
    361                 Ok(AddOutgoingResult {
    362                     result: r.try_get(0)?,
    363                     row_id: r.try_get(1)?,
    364                 })
    365             })
    366             .fetch_one(&mut *db)
    367     })
    368 }
    369 
    370 #[derive(Debug, PartialEq, Eq)]
    371 pub enum TransferResult {
    372     Success { id: u64, initiated_at: Timestamp },
    373     RequestUidReuse,
    374     WtidReuse,
    375 }
    376 
    377 #[derive(Debug, Clone)]
    378 pub struct Transfer {
    379     pub request_uid: HashCode,
    380     pub amount: Decimal,
    381     pub exchange_base_url: Url,
    382     pub metadata: Option<CompactString>,
    383     pub wtid: ShortHashCode,
    384     pub creditor_id: i64,
    385     pub creditor_name: CompactString,
    386 }
    387 
    388 pub async fn make_transfer(
    389     db: &PgPool,
    390     tx: &Transfer,
    391     now: &Timestamp,
    392 ) -> sqlx::Result<TransferResult> {
    393     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    394     serialized!(
    395         sqlx::query(
    396             "
    397                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    398                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    399             ",
    400         )
    401         .bind(&tx.request_uid)
    402         .bind(&tx.wtid)
    403         .bind(&subject)
    404         .bind(tx.amount)
    405         .bind(tx.exchange_base_url.as_str())
    406         .bind(&tx.metadata)
    407         .bind(tx.creditor_id)
    408         .bind(&tx.creditor_name)
    409         .bind_timestamp(now)
    410         .try_map(|r: PgRow| {
    411             Ok(if r.try_get_flag(0)? {
    412                 TransferResult::RequestUidReuse
    413             } else if r.try_get_flag(1)? {
    414                 TransferResult::WtidReuse
    415             } else {
    416                 TransferResult::Success {
    417                     id: r.try_get_u64(2)?,
    418                     initiated_at: r.try_get_timestamp(3)?,
    419                 }
    420             })
    421         })
    422         .fetch_one(db)
    423     )
    424 }
    425 
    426 #[derive(Debug, PartialEq, Eq)]
    427 pub struct BounceResult {
    428     pub tx_id: u64,
    429     pub tx_new: bool,
    430 }
    431 
    432 pub async fn register_bounced_tx_in(
    433     db: &mut PgConnection,
    434     tx: &TxIn,
    435     chargeback_id: i64,
    436     reason: &str,
    437     now: &Timestamp,
    438 ) -> sqlx::Result<BounceResult> {
    439     serialized!(
    440         sqlx::query(
    441             "
    442                 SELECT out_tx_row_id, out_tx_new
    443                 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    444             ",
    445         )
    446         .bind(tx.transfer_id)
    447         .bind(tx.tx_id)
    448         .bind(tx.amount)
    449         .bind(&tx.subject)
    450         .bind(tx.debtor_id)
    451         .bind(&tx.debtor_name)
    452         .bind_timestamp(&tx.valued_at)
    453         .bind(chargeback_id)
    454         .bind(reason)
    455         .bind_timestamp(now)
    456         .try_map(|r: PgRow| {
    457             Ok(BounceResult {
    458                 tx_id: r.try_get_u64(0)?,
    459                 tx_new: r.try_get(1)?,
    460             })
    461         })
    462         .fetch_one(&mut *db)
    463     )
    464 }
    465 
    466 pub async fn transfer_page(
    467     db: &PgPool,
    468     status: &Option<TransferState>,
    469     currency: &Currency,
    470     root: &CompactString,
    471     params: &Page,
    472 ) -> sqlx::Result<Vec<TransferListStatus>> {
    473     page(
    474         db,
    475         params,
    476         "initiated_id",
    477         || {
    478             let mut builder = QueryBuilder::new(
    479                 "
    480                     SELECT
    481                         initiated_id,
    482                         status,
    483                         amount,
    484                         credit_account,
    485                         credit_name,
    486                         initiated_at
    487                     FROM transfer
    488                     JOIN initiated USING (initiated_id)
    489                     WHERE
    490                 ",
    491             );
    492             if let Some(status) = status {
    493                 builder.push(" status = ").push_bind(status).push(" AND ");
    494             }
    495             builder
    496         },
    497         |r: PgRow| {
    498             Ok(TransferListStatus {
    499                 row_id: r.try_get_u64(0)?,
    500                 status: r.try_get(1)?,
    501                 amount: r.try_get_amount(2, currency)?,
    502                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    503                 timestamp: r.try_get_timestamp(5)?.into(),
    504             })
    505         },
    506     )
    507     .await
    508 }
    509 
    510 pub async fn outgoing_history(
    511     db: &PgPool,
    512     params: &History,
    513     currency: &Currency,
    514     root: &CompactString,
    515     listen: impl FnOnce() -> Receiver<i64>,
    516 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    517     history(
    518         db,
    519         "tx_out_id",
    520         params,
    521         listen,
    522         || {
    523             QueryBuilder::new(
    524                 "
    525                 SELECT
    526                     tx_out_id,
    527                     amount,
    528                     credit_account,
    529                     credit_name,
    530                     valued_at,
    531                     exchange_base_url,
    532                     metadata,
    533                     wtid
    534                 FROM taler_out
    535                 JOIN tx_out USING (tx_out_id)
    536                 WHERE
    537             ",
    538             )
    539         },
    540         |r: PgRow| {
    541             Ok(OutgoingBankTransaction {
    542                 row_id: r.try_get_u64(0)?,
    543                 amount: r.try_get_amount(1, currency)?,
    544                 debit_fee: None,
    545                 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?,
    546                 date: r.try_get_timestamp(4)?.into(),
    547                 exchange_base_url: r.try_get_url(5)?,
    548                 metadata: r.try_get(6)?,
    549                 wtid: r.try_get(7)?,
    550             })
    551         },
    552     )
    553     .await
    554 }
    555 
    556 pub async fn incoming_history(
    557     db: &PgPool,
    558     params: &History,
    559     currency: &Currency,
    560     root: &CompactString,
    561     listen: impl FnOnce() -> Receiver<i64>,
    562 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    563     history(
    564         db,
    565         "tx_in_id",
    566         params,
    567         listen,
    568         || {
    569             QueryBuilder::new(
    570                 "
    571                 SELECT
    572                     type,
    573                     tx_in_id,
    574                     amount,
    575                     debit_account,
    576                     debit_name,
    577                     valued_at,
    578                     metadata,
    579                     authorization_pub,
    580                     authorization_sig
    581                 FROM taler_in
    582                 JOIN tx_in USING (tx_in_id)
    583                 WHERE
    584             ",
    585             )
    586         },
    587         |r: PgRow| {
    588             Ok(match r.try_get(0)? {
    589                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    590                     row_id: r.try_get_u64(1)?,
    591                     amount: r.try_get_amount(2, currency)?,
    592                     credit_fee: None,
    593                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    594                     date: r.try_get_timestamp(5)?.into(),
    595                     reserve_pub: r.try_get(6)?,
    596                     authorization_pub: r.try_get(7)?,
    597                     authorization_sig: r.try_get(8)?,
    598                 },
    599                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    600                     row_id: r.try_get_u64(1)?,
    601                     amount: r.try_get_amount(2, currency)?,
    602                     credit_fee: None,
    603                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    604                     date: r.try_get_timestamp(5)?.into(),
    605                     account_pub: r.try_get(6)?,
    606                     authorization_pub: r.try_get(7)?,
    607                     authorization_sig: r.try_get(8)?,
    608                 },
    609                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    610             })
    611         },
    612     )
    613     .await
    614 }
    615 
    616 pub async fn revenue_history(
    617     db: &PgPool,
    618     params: &History,
    619     currency: &Currency,
    620     root: &CompactString,
    621     listen: impl FnOnce() -> Receiver<i64>,
    622 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    623     history(
    624         db,
    625         "tx_in_id",
    626         params,
    627         listen,
    628         || {
    629             QueryBuilder::new(
    630                 "
    631                 SELECT
    632                     tx_in_id,
    633                     valued_at,
    634                     amount,
    635                     debit_account,
    636                     debit_name,
    637                     subject
    638                 FROM tx_in
    639                 WHERE
    640             ",
    641             )
    642         },
    643         |r: PgRow| {
    644             Ok(RevenueIncomingBankTransaction {
    645                 row_id: r.try_get_u64(0)?,
    646                 date: r.try_get_timestamp(1)?.into(),
    647                 amount: r.try_get_amount(2, currency)?,
    648                 credit_fee: None,
    649                 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    650                 subject: r.try_get(5)?,
    651             })
    652         },
    653     )
    654     .await
    655 }
    656 
    657 pub async fn transfer_by_id(
    658     db: &PgPool,
    659     id: u64,
    660     currency: &Currency,
    661     root: &CompactString,
    662 ) -> sqlx::Result<Option<TransferStatus>> {
    663     serialized!(
    664         sqlx::query(
    665             "
    666                 SELECT
    667                     status,
    668                     status_msg,
    669                     amount,
    670                     exchange_base_url,
    671                     metadata,
    672                     wtid,
    673                     credit_account,
    674                     credit_name,
    675                     initiated_at
    676                 FROM transfer
    677                 JOIN initiated USING (initiated_id)
    678                 WHERE initiated_id = $1
    679             ",
    680         )
    681         .bind(id as i64)
    682         .try_map(|r: PgRow| {
    683             Ok(TransferStatus {
    684                 status: r.try_get(0)?,
    685                 status_msg: r.try_get(1)?,
    686                 amount: r.try_get_amount(2, currency)?,
    687                 origin_exchange_url: r.try_get(3)?,
    688                 metadata: r.try_get(4)?,
    689                 wtid: r.try_get(5)?,
    690                 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?,
    691                 timestamp: r.try_get_timestamp(8)?.into(),
    692             })
    693         })
    694         .fetch_optional(db)
    695     )
    696 }
    697 
    698 /** Get a batch of pending initiated transactions not attempted since [start] */
    699 pub async fn pending_batch(
    700     db: &mut PgConnection,
    701     start: &Timestamp,
    702 ) -> sqlx::Result<Vec<Initiated>> {
    703     serialized!(
    704         sqlx::query(
    705             "
    706             SELECT initiated_id, amount, subject, credit_account, credit_name
    707             FROM initiated
    708             WHERE tx_id IS NULL
    709                 AND status='pending'
    710                 AND (last_submitted IS NULL OR last_submitted < $1)
    711             LIMIT 100
    712         ",
    713         )
    714         .bind_timestamp(start)
    715         .try_map(|r: PgRow| {
    716             Ok(Initiated {
    717                 id: r.try_get(0)?,
    718                 amount: r.try_get(1)?,
    719                 subject: r.try_get(2)?,
    720                 creditor_id: r.try_get(3)?,
    721                 creditor_name: r.try_get(4)?,
    722             })
    723         })
    724         .fetch_all(&mut *db)
    725     )
    726 }
    727 
    728 /** Update status of a successful submitted initiated transaction */
    729 pub async fn initiated_submit_success(
    730     db: &mut PgConnection,
    731     initiated_id: i64,
    732     timestamp: &Timestamp,
    733     tx_id: i64,
    734 ) -> sqlx::Result<()> {
    735     serialized!(
    736         sqlx::query(
    737             "
    738                 UPDATE initiated
    739                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    740                 WHERE initiated_id=$3
    741             "
    742         )
    743         .bind_timestamp(timestamp)
    744         .bind(tx_id)
    745         .bind(initiated_id)
    746         .execute(&mut *db)
    747     )?;
    748     Ok(())
    749 }
    750 
    751 /** Update status of a permanently failed initiated transaction */
    752 pub async fn initiated_submit_permanent_failure(
    753     db: &mut PgConnection,
    754     initiated_id: i64,
    755     msg: &str,
    756 ) -> sqlx::Result<()> {
    757     serialized!(
    758         sqlx::query(
    759             "
    760                 UPDATE initiated
    761                 SET status='permanent_failure', status_msg=$1
    762                 WHERE initiated_id=$2
    763             ",
    764         )
    765         .bind(msg)
    766         .bind(initiated_id)
    767         .execute(&mut *db)
    768     )?;
    769     Ok(())
    770 }
    771 
    772 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    773 pub enum ChargebackFailureResult {
    774     Unknown,
    775     Known(u64),
    776     Idempotent(u64),
    777 }
    778 
    779 /** Update status of a charged back initiated transaction */
    780 pub async fn initiated_chargeback_failure(
    781     db: &mut PgConnection,
    782     transfer_id: i64,
    783 ) -> sqlx::Result<ChargebackFailureResult> {
    784     Ok(serialized!(
    785         sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
    786             .bind(transfer_id)
    787             .try_map(|r: PgRow| {
    788                 let id = r.try_get_u64(0)?;
    789                 Ok(if id == 0 {
    790                     ChargebackFailureResult::Unknown
    791                 } else if r.try_get(1)? {
    792                     ChargebackFailureResult::Known(id)
    793                 } else {
    794                     ChargebackFailureResult::Idempotent(id)
    795                 })
    796             })
    797             .fetch_optional(&mut *db)
    798     )?
    799     .unwrap_or(ChargebackFailureResult::Unknown))
    800 }
    801 
    802 /** Get JSON value from KV table */
    803 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    804     db: &mut PgConnection,
    805     key: &str,
    806 ) -> sqlx::Result<Option<T>> {
    807     serialized!(
    808         sqlx::query("SELECT value FROM kv WHERE key=$1")
    809             .bind(key)
    810             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    811             .fetch_optional(&mut *db)
    812     )
    813 }
    814 
    815 /** Set JSON value in KV table */
    816 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    817     serialized!(
    818         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    819             .bind(key)
    820             .bind(sqlx::types::Json(value))
    821             .execute(&mut *db)
    822     )?;
    823     Ok(())
    824 }
    825 
    826 pub enum RegistrationResult {
    827     Success,
    828     ReservePubReuse,
    829 }
    830 
    831 pub async fn transfer_register(
    832     db: &PgPool,
    833     req: &RegistrationRequest,
    834 ) -> sqlx::Result<RegistrationResult> {
    835     let ty: IncomingType = req.r#type.into();
    836     serialized!(
    837         sqlx::query(
    838             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    839         )
    840         .bind(ty)
    841         .bind(&req.account_pub)
    842         .bind(&req.authorization_pub)
    843         .bind(&req.authorization_sig)
    844         .bind(req.recurrent)
    845         .bind_timestamp(&Timestamp::now())
    846         .try_map(|r: PgRow| {
    847             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    848                 RegistrationResult::ReservePubReuse
    849             } else {
    850                 RegistrationResult::Success
    851             })
    852         })
    853         .fetch_one(db)
    854     )
    855 }
    856 
    857 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    858     serialized!(
    859         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)")
    860             .bind(&req.authorization_pub)
    861             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    862             .fetch_one(db)
    863     )
    864 }
    865 
    866 pub trait CyclosTypeHelper {
    867     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    868         &self,
    869         idx: I,
    870         name: I,
    871         root: &CompactString,
    872     ) -> sqlx::Result<FullCyclosPayto>;
    873     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    874         &self,
    875         idx: I,
    876         name: I,
    877         root: &CompactString,
    878     ) -> sqlx::Result<PaytoURI>;
    879 }
    880 
    881 impl CyclosTypeHelper for PgRow {
    882     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    883         &self,
    884         idx: I,
    885         name: I,
    886         root: &CompactString,
    887     ) -> sqlx::Result<FullCyclosPayto> {
    888         let idx = self.try_get(idx)?;
    889         let name = self.try_get(name)?;
    890         Ok(FullCyclosPayto::new(
    891             CyclosAccount {
    892                 id: CyclosId(idx),
    893                 root: root.clone(),
    894             },
    895             name,
    896         ))
    897     }
    898     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    899         &self,
    900         idx: I,
    901         name: I,
    902         root: &CompactString,
    903     ) -> sqlx::Result<PaytoURI> {
    904         let idx = self.try_get(idx)?;
    905         let name = self.try_get(name)?;
    906         Ok(CyclosAccount {
    907             id: CyclosId(idx),
    908             root: root.clone(),
    909         }
    910         .as_full_uri(name))
    911     }
    912 }
    913 
    914 #[cfg(test)]
    915 mod test {
    916     use std::assert_matches;
    917 
    918     use compact_str::CompactString;
    919     use jiff::{Span, Timestamp};
    920     use serde_json::json;
    921     use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow};
    922     use taler_api::{
    923         db::TypeHelper,
    924         notification::dummy_listen,
    925         subject::{IncomingSubject, OutgoingSubject},
    926     };
    927     use taler_common::{
    928         api::{
    929             EddsaPublicKey, HashCode, ShortHashCode,
    930             params::{History, Page},
    931             wire::TransferState,
    932         },
    933         types::{
    934             amount::{Currency, decimal},
    935             url,
    936             utils::now_sql_stable_ts,
    937         },
    938     };
    939 
    940     use crate::{
    941         constants::CONFIG_SOURCE,
    942         db::{
    943             self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
    944             Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
    945         },
    946     };
    947 
    948     pub const CURR: Currency = Currency::TEST;
    949     pub const ROOT: CompactString = CompactString::const_new("localhost");
    950 
    951     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    952         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    953     }
    954 
    955     #[tokio::test]
    956     async fn kv() {
    957         let (mut db, _) = setup().await;
    958 
    959         let value = json!({
    960             "name": "Mr Smith",
    961             "no way": 32
    962         });
    963 
    964         assert_eq!(
    965             db::kv_get::<serde_json::Value>(&mut db, "value")
    966                 .await
    967                 .unwrap(),
    968             None
    969         );
    970         db::kv_set(&mut db, "value", &value).await.unwrap();
    971         db::kv_set(&mut db, "value", &value).await.unwrap();
    972         assert_eq!(
    973             db::kv_get::<serde_json::Value>(&mut db, "value")
    974                 .await
    975                 .unwrap(),
    976             Some(value)
    977         );
    978     }
    979 
    980     #[tokio::test]
    981     async fn tx_in() {
    982         let (mut db, pool) = setup().await;
    983 
    984         let mut routine = async |first: &Option<IncomingSubject>,
    985                                  second: &Option<IncomingSubject>| {
    986             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    987                 .try_map(|r: PgRow| r.try_get_u64(0))
    988                 .fetch_one(&mut *db)
    989                 .await
    990                 .unwrap();
    991             let now = now_sql_stable_ts();
    992             let later = now + Span::new().hours(2);
    993             let tx = TxIn {
    994                 transfer_id: now.as_microsecond() as i64,
    995                 tx_id: None,
    996                 amount: decimal("10"),
    997                 subject: "subject".to_owned(),
    998                 debtor_id: 31000163100000000,
    999                 debtor_name: "Name".into(),
   1000                 valued_at: now,
   1001             };
   1002             // Insert
   1003             assert_eq!(
   1004                 db::register_tx_in(&mut db, &tx, first, &now)
   1005                     .await
   1006                     .expect("register tx in"),
   1007                 AddIncomingResult::Success {
   1008                     new: true,
   1009                     pending: false,
   1010                     row_id: id,
   1011                     valued_at: now,
   1012                 }
   1013             );
   1014             // Idempotent
   1015             assert_eq!(
   1016                 db::register_tx_in(
   1017                     &mut db,
   1018                     &TxIn {
   1019                         valued_at: later,
   1020                         ..tx.clone()
   1021                     },
   1022                     first,
   1023                     &now
   1024                 )
   1025                 .await
   1026                 .expect("register tx in"),
   1027                 AddIncomingResult::Success {
   1028                     new: false,
   1029                     pending: false,
   1030                     row_id: id,
   1031                     valued_at: now
   1032                 }
   1033             );
   1034             // Many
   1035             assert_eq!(
   1036                 db::register_tx_in(
   1037                     &mut db,
   1038                     &TxIn {
   1039                         transfer_id: later.as_microsecond() as i64,
   1040                         valued_at: later,
   1041                         ..tx
   1042                     },
   1043                     second,
   1044                     &now
   1045                 )
   1046                 .await
   1047                 .expect("register tx in"),
   1048                 AddIncomingResult::Success {
   1049                     new: true,
   1050                     pending: false,
   1051                     row_id: id + 1,
   1052                     valued_at: later
   1053                 }
   1054             );
   1055         };
   1056 
   1057         // Empty db
   1058         assert_eq!(
   1059             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1060                 .await
   1061                 .unwrap(),
   1062             Vec::new()
   1063         );
   1064         assert_eq!(
   1065             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1066                 .await
   1067                 .unwrap(),
   1068             Vec::new()
   1069         );
   1070 
   1071         // Regular transaction
   1072         routine(&None, &None).await;
   1073 
   1074         let first = EddsaPublicKey::rand();
   1075         let second = EddsaPublicKey::rand();
   1076 
   1077         // Reserve transaction
   1078         routine(
   1079             &Some(IncomingSubject::Reserve(first.clone())),
   1080             &Some(IncomingSubject::Reserve(second)),
   1081         )
   1082         .await;
   1083 
   1084         // Kyc transaction
   1085         routine(
   1086             &Some(IncomingSubject::Kyc(first.clone())),
   1087             &Some(IncomingSubject::Kyc(first)),
   1088         )
   1089         .await;
   1090 
   1091         // History
   1092         assert_eq!(
   1093             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1094                 .await
   1095                 .unwrap()
   1096                 .len(),
   1097             6
   1098         );
   1099         assert_eq!(
   1100             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1101                 .await
   1102                 .unwrap()
   1103                 .len(),
   1104             4
   1105         );
   1106     }
   1107 
   1108     #[tokio::test]
   1109     async fn tx_in_admin() {
   1110         let (_, pool) = setup().await;
   1111 
   1112         // Empty db
   1113         assert_eq!(
   1114             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1115                 .await
   1116                 .unwrap(),
   1117             Vec::new()
   1118         );
   1119 
   1120         let now = now_sql_stable_ts();
   1121         let later = now + Span::new().hours(2);
   1122         let tx = TxInAdmin {
   1123             amount: decimal("10"),
   1124             subject: "subject".to_owned(),
   1125             debtor_id: 31000163100000000,
   1126             debtor_name: "Name".into(),
   1127             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1128         };
   1129         // Insert
   1130         assert_eq!(
   1131             db::register_tx_in_admin(&pool, &tx, &now)
   1132                 .await
   1133                 .expect("register tx in"),
   1134             AddIncomingResult::Success {
   1135                 new: true,
   1136                 pending: false,
   1137                 row_id: 1,
   1138                 valued_at: now
   1139             }
   1140         );
   1141         // Many
   1142         assert_eq!(
   1143             db::register_tx_in_admin(
   1144                 &pool,
   1145                 &TxInAdmin {
   1146                     subject: "Other".to_owned(),
   1147                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1148                     ..tx.clone()
   1149                 },
   1150                 &later
   1151             )
   1152             .await
   1153             .expect("register tx in"),
   1154             AddIncomingResult::Success {
   1155                 new: true,
   1156                 pending: false,
   1157                 row_id: 2,
   1158                 valued_at: later
   1159             }
   1160         );
   1161 
   1162         // History
   1163         assert_eq!(
   1164             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1165                 .await
   1166                 .unwrap()
   1167                 .len(),
   1168             2
   1169         );
   1170     }
   1171 
   1172     #[tokio::test]
   1173     async fn tx_out() {
   1174         let (mut db, pool) = setup().await;
   1175 
   1176         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1177             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1178                 .try_map(|r: PgRow| r.try_get(0))
   1179                 .fetch_one(&mut *db)
   1180                 .await
   1181                 .unwrap();
   1182             let now = now_sql_stable_ts();
   1183             let later = now + Span::new().hours(2);
   1184             let tx = TxOut {
   1185                 transfer_id,
   1186                 tx_id: Some(transfer_id),
   1187                 amount: decimal("10"),
   1188                 subject: "subject".to_owned(),
   1189                 creditor_id: 31000163100000000,
   1190                 creditor_name: "Name".into(),
   1191                 valued_at: now,
   1192             };
   1193             assert_matches!(
   1194                 db::make_transfer(
   1195                     &pool,
   1196                     &Transfer {
   1197                         request_uid: HashCode::rand(),
   1198                         amount: decimal("10"),
   1199                         exchange_base_url: url("https://exchange.test.com/"),
   1200                         metadata: None,
   1201                         wtid: ShortHashCode::rand(),
   1202                         creditor_id: 31000163100000000,
   1203                         creditor_name: "Name".into()
   1204                     },
   1205                     &now
   1206                 )
   1207                 .await
   1208                 .unwrap(),
   1209                 TransferResult::Success { .. }
   1210             );
   1211             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id)
   1212                 .await
   1213                 .expect("status success");
   1214 
   1215             // Insert
   1216             assert_eq!(
   1217                 db::register_tx_out(&mut db, &tx, first, &now)
   1218                     .await
   1219                     .expect("register tx out"),
   1220                 AddOutgoingResult {
   1221                     result: db::RegisterResult::known,
   1222                     row_id: transfer_id,
   1223                 }
   1224             );
   1225             // Idempotent
   1226             assert_eq!(
   1227                 db::register_tx_out(
   1228                     &mut db,
   1229                     &TxOut {
   1230                         valued_at: later,
   1231                         ..tx.clone()
   1232                     },
   1233                     first,
   1234                     &now
   1235                 )
   1236                 .await
   1237                 .expect("register tx out"),
   1238                 AddOutgoingResult {
   1239                     result: db::RegisterResult::idempotent,
   1240                     row_id: transfer_id,
   1241                 }
   1242             );
   1243             // Recovered
   1244             assert_eq!(
   1245                 db::register_tx_out(
   1246                     &mut db,
   1247                     &TxOut {
   1248                         transfer_id: transfer_id + 1,
   1249                         tx_id: Some(transfer_id + 1),
   1250                         valued_at: later,
   1251                         ..tx.clone()
   1252                     },
   1253                     second,
   1254                     &now
   1255                 )
   1256                 .await
   1257                 .expect("register tx out"),
   1258                 AddOutgoingResult {
   1259                     result: db::RegisterResult::recovered,
   1260                     row_id: transfer_id + 1,
   1261                 }
   1262             );
   1263         };
   1264 
   1265         // Empty db
   1266         assert_eq!(
   1267             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1268                 .await
   1269                 .unwrap(),
   1270             Vec::new()
   1271         );
   1272 
   1273         // Regular transaction
   1274         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1275 
   1276         // Talerable transaction
   1277         routine(
   1278             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1279             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1280         )
   1281         .await;
   1282 
   1283         // Bounced transaction
   1284         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1285 
   1286         // History
   1287         assert_eq!(
   1288             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1289                 .await
   1290                 .unwrap()
   1291                 .len(),
   1292             2
   1293         );
   1294     }
   1295 
   1296     // TODO tx out failure
   1297 
   1298     #[tokio::test]
   1299     async fn transfer() {
   1300         let (_, pool) = setup().await;
   1301 
   1302         // Empty db
   1303         assert_eq!(
   1304             db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(),
   1305             None
   1306         );
   1307         assert_eq!(
   1308             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1309                 .await
   1310                 .unwrap(),
   1311             Vec::new()
   1312         );
   1313 
   1314         let req = Transfer {
   1315             request_uid: HashCode::rand(),
   1316             amount: decimal("10"),
   1317             exchange_base_url: url("https://exchange.test.com/"),
   1318             metadata: None,
   1319             wtid: ShortHashCode::rand(),
   1320             creditor_id: 31000163100000000,
   1321             creditor_name: "Name".into(),
   1322         };
   1323         let now = now_sql_stable_ts();
   1324         let later = now + Span::new().hours(2);
   1325         // Insert
   1326         assert_eq!(
   1327             db::make_transfer(&pool, &req, &now)
   1328                 .await
   1329                 .expect("transfer"),
   1330             TransferResult::Success {
   1331                 id: 1,
   1332                 initiated_at: now
   1333             }
   1334         );
   1335         // Idempotent
   1336         assert_eq!(
   1337             db::make_transfer(&pool, &req, &later)
   1338                 .await
   1339                 .expect("transfer"),
   1340             TransferResult::Success {
   1341                 id: 1,
   1342                 initiated_at: now
   1343             }
   1344         );
   1345         // Request UID reuse
   1346         assert_eq!(
   1347             db::make_transfer(
   1348                 &pool,
   1349                 &Transfer {
   1350                     wtid: ShortHashCode::rand(),
   1351                     ..req.clone()
   1352                 },
   1353                 &now
   1354             )
   1355             .await
   1356             .expect("transfer"),
   1357             TransferResult::RequestUidReuse
   1358         );
   1359         // wtid reuse
   1360         assert_eq!(
   1361             db::make_transfer(
   1362                 &pool,
   1363                 &Transfer {
   1364                     request_uid: HashCode::rand(),
   1365                     ..req.clone()
   1366                 },
   1367                 &now
   1368             )
   1369             .await
   1370             .expect("transfer"),
   1371             TransferResult::WtidReuse
   1372         );
   1373         // Many
   1374         assert_eq!(
   1375             db::make_transfer(
   1376                 &pool,
   1377                 &Transfer {
   1378                     request_uid: HashCode::rand(),
   1379                     wtid: ShortHashCode::rand(),
   1380                     ..req
   1381                 },
   1382                 &later
   1383             )
   1384             .await
   1385             .expect("transfer"),
   1386             TransferResult::Success {
   1387                 id: 2,
   1388                 initiated_at: later
   1389             }
   1390         );
   1391 
   1392         // Get
   1393         assert!(
   1394             db::transfer_by_id(&pool, 1, &CURR, &ROOT)
   1395                 .await
   1396                 .unwrap()
   1397                 .is_some()
   1398         );
   1399         assert!(
   1400             db::transfer_by_id(&pool, 2, &CURR, &ROOT)
   1401                 .await
   1402                 .unwrap()
   1403                 .is_some()
   1404         );
   1405         assert!(
   1406             db::transfer_by_id(&pool, 3, &CURR, &ROOT)
   1407                 .await
   1408                 .unwrap()
   1409                 .is_none()
   1410         );
   1411         assert_eq!(
   1412             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1413                 .await
   1414                 .unwrap()
   1415                 .len(),
   1416             2
   1417         );
   1418     }
   1419 
   1420     #[tokio::test]
   1421     async fn bounce() {
   1422         let (mut db, _) = setup().await;
   1423 
   1424         let amount = decimal("10");
   1425         let now = now_sql_stable_ts();
   1426 
   1427         // Bounce
   1428         assert_eq!(
   1429             db::register_bounced_tx_in(
   1430                 &mut db,
   1431                 &TxIn {
   1432                     transfer_id: 12,
   1433                     tx_id: None,
   1434                     amount,
   1435                     subject: "subject".to_owned(),
   1436                     debtor_id: 31000163100000000,
   1437                     debtor_name: "Name".into(),
   1438                     valued_at: now
   1439                 },
   1440                 22,
   1441                 "good reason",
   1442                 &now
   1443             )
   1444             .await
   1445             .expect("bounce"),
   1446             BounceResult {
   1447                 tx_id: 1,
   1448                 tx_new: true
   1449             }
   1450         );
   1451         // Idempotent
   1452         assert_eq!(
   1453             db::register_bounced_tx_in(
   1454                 &mut db,
   1455                 &TxIn {
   1456                     transfer_id: 12,
   1457                     tx_id: None,
   1458                     amount,
   1459                     subject: "subject".to_owned(),
   1460                     debtor_id: 31000163100000000,
   1461                     debtor_name: "Name".into(),
   1462                     valued_at: now
   1463                 },
   1464                 22,
   1465                 "good reason",
   1466                 &now
   1467             )
   1468             .await
   1469             .expect("bounce"),
   1470             BounceResult {
   1471                 tx_id: 1,
   1472                 tx_new: false
   1473             }
   1474         );
   1475 
   1476         // Many
   1477         assert_eq!(
   1478             db::register_bounced_tx_in(
   1479                 &mut db,
   1480                 &TxIn {
   1481                     transfer_id: 13,
   1482                     tx_id: None,
   1483                     amount,
   1484                     subject: "subject".to_owned(),
   1485                     debtor_id: 31000163100000000,
   1486                     debtor_name: "Name".into(),
   1487                     valued_at: now
   1488                 },
   1489                 23,
   1490                 "good reason",
   1491                 &now
   1492             )
   1493             .await
   1494             .expect("bounce"),
   1495             BounceResult {
   1496                 tx_id: 2,
   1497                 tx_new: true
   1498             }
   1499         );
   1500     }
   1501 
   1502     #[tokio::test]
   1503     async fn status() {
   1504         let (mut db, pool) = setup().await;
   1505 
   1506         let check_status = async |id: u64, status: TransferState, msg: Option<&str>| {
   1507             let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT)
   1508                 .await
   1509                 .unwrap()
   1510                 .unwrap();
   1511             assert_eq!(
   1512                 (status, msg),
   1513                 (transfer.status, transfer.status_msg.as_deref())
   1514             );
   1515         };
   1516 
   1517         // Unknown transfer
   1518         db::initiated_submit_permanent_failure(&mut db, 1, "msg")
   1519             .await
   1520             .unwrap();
   1521         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1522             .await
   1523             .unwrap();
   1524         assert_eq!(
   1525             db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
   1526             ChargebackFailureResult::Unknown
   1527         );
   1528 
   1529         // Failure
   1530         db::make_transfer(
   1531             &pool,
   1532             &Transfer {
   1533                 request_uid: HashCode::rand(),
   1534                 amount: decimal("1"),
   1535                 exchange_base_url: url("https://exchange.test.com/"),
   1536                 metadata: None,
   1537                 wtid: ShortHashCode::rand(),
   1538                 creditor_id: 31000163100000000,
   1539                 creditor_name: "Name".into(),
   1540             },
   1541             &Timestamp::now(),
   1542         )
   1543         .await
   1544         .expect("transfer");
   1545         check_status(1, TransferState::pending, None).await;
   1546         db::initiated_submit_permanent_failure(&mut db, 1, "error status")
   1547             .await
   1548             .unwrap();
   1549         check_status(1, TransferState::permanent_failure, Some("error status")).await;
   1550 
   1551         // Success
   1552         db::make_transfer(
   1553             &pool,
   1554             &Transfer {
   1555                 request_uid: HashCode::rand(),
   1556                 amount: decimal("1"),
   1557                 exchange_base_url: url("https://exchange.test.com/"),
   1558                 metadata: None,
   1559                 wtid: ShortHashCode::rand(),
   1560                 creditor_id: 31000163100000000,
   1561                 creditor_name: "Name".into(),
   1562             },
   1563             &Timestamp::now(),
   1564         )
   1565         .await
   1566         .expect("transfer");
   1567         check_status(2, TransferState::pending, None).await;
   1568         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1569             .await
   1570             .unwrap();
   1571         check_status(2, TransferState::pending, None).await;
   1572         db::register_tx_out(
   1573             &mut db,
   1574             &TxOut {
   1575                 transfer_id: 5,
   1576                 tx_id: Some(3),
   1577                 amount: decimal("2"),
   1578                 subject: "".to_string(),
   1579                 creditor_id: 31000163100000000,
   1580                 creditor_name: "Name".into(),
   1581                 valued_at: Timestamp::now(),
   1582             },
   1583             &TxOutKind::Simple,
   1584             &Timestamp::now(),
   1585         )
   1586         .await
   1587         .unwrap();
   1588         check_status(2, TransferState::success, None).await;
   1589 
   1590         // Chargeback
   1591         assert_eq!(
   1592             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1593             ChargebackFailureResult::Known(2)
   1594         );
   1595         check_status(2, TransferState::late_failure, Some("charged back")).await;
   1596         assert_eq!(
   1597             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1598             ChargebackFailureResult::Idempotent(2)
   1599         );
   1600     }
   1601 
   1602     #[tokio::test]
   1603     async fn batch() {
   1604         let (mut db, pool) = setup().await;
   1605         let start = Timestamp::now();
   1606 
   1607         // Empty db
   1608         let pendings = db::pending_batch(&mut db, &start)
   1609             .await
   1610             .expect("pending_batch");
   1611         assert_eq!(pendings.len(), 0);
   1612 
   1613         // Some transfers
   1614         for i in 0..3 {
   1615             db::make_transfer(
   1616                 &pool,
   1617                 &Transfer {
   1618                     request_uid: HashCode::rand(),
   1619                     amount: decimal(format!("{}", i + 1)),
   1620                     exchange_base_url: url("https://exchange.test.com/"),
   1621                     metadata: None,
   1622                     wtid: ShortHashCode::rand(),
   1623                     creditor_id: 31000163100000000,
   1624                     creditor_name: "Name".into(),
   1625                 },
   1626                 &Timestamp::now(),
   1627             )
   1628             .await
   1629             .expect("transfer");
   1630         }
   1631         let pendings = db::pending_batch(&mut db, &start)
   1632             .await
   1633             .expect("pending_batch");
   1634         assert_eq!(pendings.len(), 3);
   1635 
   1636         // Max 100 txs in batch
   1637         for i in 0..100 {
   1638             db::make_transfer(
   1639                 &pool,
   1640                 &Transfer {
   1641                     request_uid: HashCode::rand(),
   1642                     amount: decimal(format!("{}", i + 1)),
   1643                     exchange_base_url: url("https://exchange.test.com/"),
   1644                     metadata: None,
   1645                     wtid: ShortHashCode::rand(),
   1646                     creditor_id: 31000163100000000,
   1647                     creditor_name: "Name".into(),
   1648                 },
   1649                 &Timestamp::now(),
   1650             )
   1651             .await
   1652             .expect("transfer");
   1653         }
   1654         let pendings = db::pending_batch(&mut db, &start)
   1655             .await
   1656             .expect("pending_batch");
   1657         assert_eq!(pendings.len(), 100);
   1658 
   1659         // Skip uploaded
   1660         for i in 0..=10 {
   1661             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1662                 .await
   1663                 .expect("status success");
   1664         }
   1665         let pendings = db::pending_batch(&mut db, &start)
   1666             .await
   1667             .expect("pending_batch");
   1668         assert_eq!(pendings.len(), 93);
   1669 
   1670         // Skip failed
   1671         for i in 0..=10 {
   1672             db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
   1673                 .await
   1674                 .expect("status failure");
   1675         }
   1676         let pendings = db::pending_batch(&mut db, &start)
   1677             .await
   1678             .expect("pending_batch");
   1679         assert_eq!(pendings.len(), 83);
   1680     }
   1681 }