taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (49090B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::Timestamp;
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api::{
     30         HashCode, ShortHashCode,
     31         params::{History, Page},
     32         prepared::{RegistrationRequest, Unregistration},
     33         revenue::RevenueIncomingBankTransaction,
     34         wire::{
     35             IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     36             TransferStatus,
     37         },
     38     },
     39     config::Config,
     40     db::IncomingType,
     41     types::{
     42         amount::{Currency, Decimal},
     43         payto::{PaytoImpl as _, PaytoURI},
     44     },
     45 };
     46 use tokio::sync::watch::{Receiver, Sender};
     47 use url::Url;
     48 
     49 use crate::{
     50     config::parse_db_cfg,
     51     payto::{CyclosAccount, CyclosId, FullCyclosPayto},
     52 };
     53 
     54 const SCHEMA: &str = "cyclos";
     55 
     56 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     57     let db = parse_db_cfg(cfg)?;
     58     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     59     Ok(pool)
     60 }
     61 
     62 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     63     let db_cfg = parse_db_cfg(cfg)?;
     64     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     65     let mut db = pool.acquire().await?;
     66     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?;
     67     Ok(pool)
     68 }
     69 
     70 pub async fn notification_listener(
     71     pool: PgPool,
     72     in_channel: Sender<i64>,
     73     taler_in_channel: Sender<i64>,
     74     out_channel: Sender<i64>,
     75     taler_out_channel: Sender<i64>,
     76 ) {
     77     taler_api::notification::notification_listener!(&pool,
     78         "tx_in" => (row_id: i64) {
     79             in_channel.send_replace(row_id);
     80         },
     81         "taler_in" => (row_id: i64) {
     82             taler_in_channel.send_replace(row_id);
     83         },
     84         "tx_out" => (row_id: i64) {
     85             out_channel.send_replace(row_id);
     86         },
     87         "taler_out" => (row_id: i64) {
     88             taler_out_channel.send_replace(row_id);
     89         }
     90     )
     91 }
     92 
     93 #[derive(Debug, Clone)]
     94 pub struct TxIn {
     95     pub transfer_id: i64,
     96     pub tx_id: Option<i64>,
     97     pub amount: Decimal,
     98     pub subject: String,
     99     pub debtor_id: i64,
    100     pub debtor_name: CompactString,
    101     pub valued_at: Timestamp,
    102 }
    103 
    104 impl Display for TxIn {
    105     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    106         let Self {
    107             transfer_id,
    108             tx_id,
    109             amount,
    110             subject,
    111             valued_at,
    112             debtor_id,
    113             debtor_name,
    114         } = self;
    115         let tx_id = match tx_id {
    116             Some(id) => format_args!(":{}", *id),
    117             None => format_args!(""),
    118         };
    119         write!(
    120             f,
    121             "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'"
    122         )
    123     }
    124 }
    125 
    126 #[derive(Debug, Clone)]
    127 pub struct TxOut {
    128     pub transfer_id: i64,
    129     pub tx_id: Option<i64>,
    130     pub amount: Decimal,
    131     pub subject: String,
    132     pub creditor_id: i64,
    133     pub creditor_name: CompactString,
    134     pub valued_at: Timestamp,
    135 }
    136 
    137 impl Display for TxOut {
    138     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    139         let Self {
    140             transfer_id,
    141             tx_id,
    142             amount,
    143             subject,
    144             creditor_id,
    145             creditor_name,
    146             valued_at,
    147         } = self;
    148         let tx_id = match tx_id {
    149             Some(id) => format_args!(":{}", *id),
    150             None => format_args!(""),
    151         };
    152         write!(
    153             f,
    154             "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    155         )
    156     }
    157 }
    158 
    159 #[derive(Debug, PartialEq, Eq)]
    160 pub struct Initiated {
    161     pub id: i64,
    162     pub amount: Decimal,
    163     pub subject: String,
    164     pub creditor_id: i64,
    165     pub creditor_name: CompactString,
    166 }
    167 
    168 impl Display for Initiated {
    169     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    170         let Self {
    171             id,
    172             amount,
    173             subject,
    174             creditor_id,
    175             creditor_name,
    176         } = self;
    177         write!(
    178             f,
    179             "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    180         )
    181     }
    182 }
    183 
    184 #[derive(Debug, Clone)]
    185 pub struct TxInAdmin {
    186     pub amount: Decimal,
    187     pub subject: String,
    188     pub debtor_id: i64,
    189     pub debtor_name: CompactString,
    190     pub metadata: IncomingSubject,
    191 }
    192 
    193 #[derive(Debug, PartialEq, Eq)]
    194 pub enum AddIncomingResult {
    195     Success {
    196         new: bool,
    197         pending: bool,
    198         row_id: u64,
    199         valued_at: Timestamp,
    200     },
    201     ReservePubReuse,
    202     UnknownMapping,
    203     MappingReuse,
    204 }
    205 
    206 /// Lock the database for worker execution
    207 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    208     sqlx::query("SELECT pg_try_advisory_lock(42)")
    209         .try_map(|r: PgRow| r.try_get(0))
    210         .fetch_one(e)
    211         .await
    212 }
    213 
    214 pub async fn register_tx_in_admin(
    215     db: &PgPool,
    216     tx: &TxInAdmin,
    217     now: &Timestamp,
    218 ) -> sqlx::Result<AddIncomingResult> {
    219     serialized!(
    220         sqlx::query(
    221             "
    222             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    223             FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    224         ",
    225         )
    226         .bind(tx.amount)
    227         .bind(&tx.subject)
    228         .bind(tx.debtor_id)
    229         .bind(&tx.debtor_name)
    230         .bind_timestamp(now)
    231         .bind(tx.metadata.ty())
    232         .bind(tx.metadata.key())
    233        .try_map(|r: PgRow| {
    234             Ok(if r.try_get_flag(0)? {
    235                 AddIncomingResult::ReservePubReuse
    236             } else if r.try_get_flag(1)? {
    237                 AddIncomingResult::MappingReuse
    238             } else if r.try_get_flag(2)? {
    239                 AddIncomingResult::UnknownMapping
    240             } else {
    241                 AddIncomingResult::Success {
    242                     row_id: r.try_get_u64(3)?,
    243                     valued_at: r.try_get_timestamp(4)?,
    244                     new: r.try_get(5)?,
    245                     pending: r.try_get(6)?
    246                 }
    247             })
    248         })
    249         .fetch_one(db)
    250     )
    251 }
    252 
    253 pub async fn register_tx_in(
    254     db: &mut PgConnection,
    255     tx: &TxIn,
    256     subject: &Option<IncomingSubject>,
    257     now: &Timestamp,
    258 ) -> sqlx::Result<AddIncomingResult> {
    259     serialized!(
    260         sqlx::query(
    261             "
    262             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    263             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    264         ",
    265         )
    266         .bind(tx.transfer_id)
    267         .bind(tx.tx_id)
    268         .bind(tx.amount)
    269         .bind(&tx.subject)
    270         .bind(tx.debtor_id)
    271         .bind(&tx.debtor_name)
    272         .bind(tx.valued_at.as_microsecond())
    273         .bind(subject.as_ref().map(|it| it.ty()))
    274         .bind(subject.as_ref().map(|it| it.key()))
    275         .bind(now.as_microsecond())
    276         .try_map(|r: PgRow| {
    277             Ok(if r.try_get_flag(0)? {
    278                 AddIncomingResult::ReservePubReuse
    279             } else if r.try_get_flag(1)? {
    280                 AddIncomingResult::MappingReuse
    281             } else if r.try_get_flag(2)? {
    282                 AddIncomingResult::UnknownMapping
    283             } else {
    284                 AddIncomingResult::Success {
    285                     row_id: r.try_get_u64(3)?,
    286                     valued_at: r.try_get_timestamp(4)?,
    287                     new: r.try_get(5)?,
    288                     pending: r.try_get(6)?
    289                 }
    290             })
    291         })
    292         .fetch_one(&mut *db)
    293     )
    294 }
    295 
    296 #[derive(Debug)]
    297 pub enum TxOutKind {
    298     Simple,
    299     Bounce(i64),
    300     Talerable(OutgoingSubject),
    301 }
    302 
    303 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    304 #[allow(non_camel_case_types)]
    305 #[sqlx(type_name = "register_result")]
    306 pub enum RegisterResult {
    307     /// Already registered
    308     idempotent,
    309     /// Initiated transaction
    310     known,
    311     /// Recovered unknown outgoing transaction
    312     recovered,
    313 }
    314 
    315 #[derive(Debug, PartialEq, Eq)]
    316 pub struct AddOutgoingResult {
    317     pub result: RegisterResult,
    318     pub row_id: i64,
    319 }
    320 
    321 pub async fn register_tx_out(
    322     db: &mut PgConnection,
    323     tx: &TxOut,
    324     kind: &TxOutKind,
    325     now: &Timestamp,
    326 ) -> sqlx::Result<AddOutgoingResult> {
    327     serialized!({
    328         let query = sqlx::query(
    329             "
    330                 SELECT out_result, out_tx_row_id
    331                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
    332             ",
    333         )
    334         .bind(tx.transfer_id)
    335         .bind(tx.tx_id)
    336         .bind(tx.amount)
    337         .bind(&tx.subject)
    338         .bind(tx.creditor_id)
    339         .bind(&tx.creditor_name)
    340         .bind_timestamp(&tx.valued_at);
    341         let query = match kind {
    342             TxOutKind::Simple => query
    343                 .bind(None::<&[u8]>)
    344                 .bind(None::<&str>)
    345                 .bind(None::<&str>)
    346                 .bind(None::<i64>),
    347             TxOutKind::Bounce(bounced) => query
    348                 .bind(None::<&[u8]>)
    349                 .bind(None::<&str>)
    350                 .bind(None::<&str>)
    351                 .bind(*bounced),
    352             TxOutKind::Talerable(subject) => query
    353                 .bind(&subject.wtid)
    354                 .bind(subject.exchange_base_url.as_str())
    355                 .bind(&subject.metadata)
    356                 .bind(None::<i64>),
    357         };
    358         query
    359             .bind_timestamp(now)
    360             .try_map(|r: PgRow| {
    361                 Ok(AddOutgoingResult {
    362                     result: r.try_get(0)?,
    363                     row_id: r.try_get(1)?,
    364                 })
    365             })
    366             .fetch_one(&mut *db)
    367     })
    368 }
    369 
    370 #[derive(Debug, PartialEq, Eq)]
    371 pub enum TransferResult {
    372     Success { id: u64, initiated_at: Timestamp },
    373     RequestUidReuse,
    374     WtidReuse,
    375 }
    376 
    377 #[derive(Debug, Clone)]
    378 pub struct Transfer {
    379     pub request_uid: HashCode,
    380     pub amount: Decimal,
    381     pub exchange_base_url: Url,
    382     pub metadata: Option<CompactString>,
    383     pub wtid: ShortHashCode,
    384     pub creditor_id: i64,
    385     pub creditor_name: CompactString,
    386 }
    387 
    388 pub async fn make_transfer(
    389     db: &PgPool,
    390     tx: &Transfer,
    391     now: &Timestamp,
    392 ) -> sqlx::Result<TransferResult> {
    393     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    394     serialized!(
    395         sqlx::query(
    396             "
    397                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    398                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    399             ",
    400         )
    401         .bind(&tx.request_uid)
    402         .bind(&tx.wtid)
    403         .bind(&subject)
    404         .bind(tx.amount)
    405         .bind(tx.exchange_base_url.as_str())
    406         .bind(&tx.metadata)
    407         .bind(tx.creditor_id)
    408         .bind(&tx.creditor_name)
    409         .bind_timestamp(now)
    410         .try_map(|r: PgRow| {
    411             Ok(if r.try_get_flag(0)? {
    412                 TransferResult::RequestUidReuse
    413             } else if r.try_get_flag(1)? {
    414                 TransferResult::WtidReuse
    415             } else {
    416                 TransferResult::Success {
    417                     id: r.try_get_u64(2)?,
    418                     initiated_at: r.try_get_timestamp(3)?,
    419                 }
    420             })
    421         })
    422         .fetch_one(db)
    423     )
    424 }
    425 
    426 #[derive(Debug, PartialEq, Eq)]
    427 pub struct BounceResult {
    428     pub tx_id: u64,
    429     pub tx_new: bool,
    430 }
    431 
    432 pub async fn register_bounced_tx_in(
    433     db: &mut PgConnection,
    434     tx: &TxIn,
    435     chargeback_id: i64,
    436     reason: &str,
    437     now: &Timestamp,
    438 ) -> sqlx::Result<BounceResult> {
    439     serialized!(
    440         sqlx::query(
    441             "
    442                 SELECT out_tx_row_id, out_tx_new
    443                 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    444             ",
    445         )
    446         .bind(tx.transfer_id)
    447         .bind(tx.tx_id)
    448         .bind(tx.amount)
    449         .bind(&tx.subject)
    450         .bind(tx.debtor_id)
    451         .bind(&tx.debtor_name)
    452         .bind_timestamp(&tx.valued_at)
    453         .bind(chargeback_id)
    454         .bind(reason)
    455         .bind_timestamp(now)
    456         .try_map(|r: PgRow| {
    457             Ok(BounceResult {
    458                 tx_id: r.try_get_u64(0)?,
    459                 tx_new: r.try_get(1)?,
    460             })
    461         })
    462         .fetch_one(&mut *db)
    463     )
    464 }
    465 
    466 pub async fn transfer_page(
    467     db: &PgPool,
    468     status: &Option<TransferState>,
    469     currency: &Currency,
    470     root: &CompactString,
    471     params: &Page,
    472 ) -> sqlx::Result<Vec<TransferListStatus>> {
    473     page(
    474         db,
    475         params,
    476         "initiated_id",
    477         || {
    478             let mut builder = QueryBuilder::new(
    479                 "
    480                     SELECT
    481                         initiated_id,
    482                         status,
    483                         amount,
    484                         credit_account,
    485                         credit_name,
    486                         initiated_at
    487                     FROM transfer
    488                     JOIN initiated USING (initiated_id)
    489                     WHERE
    490                 ",
    491             );
    492             if let Some(status) = status {
    493                 builder.push(" status = ").push_bind(status).push(" AND ");
    494             }
    495             builder
    496         },
    497         |r: PgRow| {
    498             Ok(TransferListStatus {
    499                 row_id: r.try_get_u64(0)?,
    500                 status: r.try_get(1)?,
    501                 amount: r.try_get_amount(2, currency)?,
    502                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    503                 timestamp: r.try_get_timestamp(5)?.into(),
    504             })
    505         },
    506     )
    507     .await
    508 }
    509 
    510 pub async fn outgoing_history(
    511     db: &PgPool,
    512     params: &History,
    513     currency: &Currency,
    514     root: &CompactString,
    515     listen: impl FnOnce() -> Receiver<i64>,
    516 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    517     history(
    518         db,
    519         "tx_out_id",
    520         params,
    521         listen,
    522         || {
    523             QueryBuilder::new(
    524                 "
    525                 SELECT
    526                     tx_out_id,
    527                     amount,
    528                     credit_account,
    529                     credit_name,
    530                     valued_at,
    531                     exchange_base_url,
    532                     metadata,
    533                     wtid
    534                 FROM taler_out
    535                 JOIN tx_out USING (tx_out_id)
    536                 WHERE
    537             ",
    538             )
    539         },
    540         |r: PgRow| {
    541             Ok(OutgoingBankTransaction {
    542                 row_id: r.try_get_u64(0)?,
    543                 amount: r.try_get_amount(1, currency)?,
    544                 debit_fee: None,
    545                 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?,
    546                 date: r.try_get_timestamp(4)?.into(),
    547                 exchange_base_url: r.try_get_url(5)?,
    548                 metadata: r.try_get(6)?,
    549                 wtid: r.try_get(7)?,
    550             })
    551         },
    552     )
    553     .await
    554 }
    555 
    556 pub async fn incoming_history(
    557     db: &PgPool,
    558     params: &History,
    559     currency: &Currency,
    560     root: &CompactString,
    561     listen: impl FnOnce() -> Receiver<i64>,
    562 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    563     history(
    564         db,
    565         "tx_in_id",
    566         params,
    567         listen,
    568         || {
    569             QueryBuilder::new(
    570                 "
    571                 SELECT
    572                     type,
    573                     tx_in_id,
    574                     amount,
    575                     debit_account,
    576                     debit_name,
    577                     valued_at,
    578                     metadata,
    579                     authorization_pub,
    580                     authorization_sig
    581                 FROM taler_in
    582                 JOIN tx_in USING (tx_in_id)
    583                 WHERE
    584             ",
    585             )
    586         },
    587         |r: PgRow| {
    588             Ok(match r.try_get(0)? {
    589                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    590                     row_id: r.try_get_u64(1)?,
    591                     amount: r.try_get_amount(2, currency)?,
    592                     credit_fee: None,
    593                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    594                     date: r.try_get_timestamp(5)?.into(),
    595                     reserve_pub: r.try_get(6)?,
    596                     authorization_pub: r.try_get(7)?,
    597                     authorization_sig: r.try_get(8)?,
    598                 },
    599                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    600                     row_id: r.try_get_u64(1)?,
    601                     amount: r.try_get_amount(2, currency)?,
    602                     credit_fee: None,
    603                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    604                     date: r.try_get_timestamp(5)?.into(),
    605                     account_pub: r.try_get(6)?,
    606                     authorization_pub: r.try_get(7)?,
    607                     authorization_sig: r.try_get(8)?,
    608                 },
    609                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    610             })
    611         },
    612     )
    613     .await
    614 }
    615 
    616 pub async fn revenue_history(
    617     db: &PgPool,
    618     params: &History,
    619     currency: &Currency,
    620     root: &CompactString,
    621     listen: impl FnOnce() -> Receiver<i64>,
    622 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    623     history(
    624         db,
    625         "tx_in_id",
    626         params,
    627         listen,
    628         || {
    629             QueryBuilder::new(
    630                 "
    631                 SELECT
    632                     tx_in_id,
    633                     valued_at,
    634                     amount,
    635                     debit_account,
    636                     debit_name,
    637                     subject
    638                 FROM tx_in
    639                 WHERE
    640             ",
    641             )
    642         },
    643         |r: PgRow| {
    644             Ok(RevenueIncomingBankTransaction {
    645                 row_id: r.try_get_u64(0)?,
    646                 date: r.try_get_timestamp(1)?.into(),
    647                 amount: r.try_get_amount(2, currency)?,
    648                 credit_fee: None,
    649                 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    650                 subject: r.try_get(5)?,
    651             })
    652         },
    653     )
    654     .await
    655 }
    656 
    657 pub async fn transfer_by_id(
    658     db: &PgPool,
    659     id: u64,
    660     currency: &Currency,
    661     root: &CompactString,
    662 ) -> sqlx::Result<Option<TransferStatus>> {
    663     serialized!(
    664         sqlx::query(
    665             "
    666                 SELECT
    667                     status,
    668                     status_msg,
    669                     amount,
    670                     exchange_base_url,
    671                     metadata,
    672                     wtid,
    673                     credit_account,
    674                     credit_name,
    675                     initiated_at
    676                 FROM transfer
    677                 JOIN initiated USING (initiated_id)
    678                 WHERE initiated_id = $1
    679             ",
    680         )
    681         .bind(id as i64)
    682         .try_map(|r: PgRow| {
    683             Ok(TransferStatus {
    684                 status: r.try_get(0)?,
    685                 status_msg: r.try_get(1)?,
    686                 amount: r.try_get_amount(2, currency)?,
    687                 origin_exchange_url: r.try_get(3)?,
    688                 metadata: r.try_get(4)?,
    689                 wtid: r.try_get(5)?,
    690                 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?,
    691                 timestamp: r.try_get_timestamp(8)?.into(),
    692             })
    693         })
    694         .fetch_optional(db)
    695     )
    696 }
    697 
    698 /** Get a batch of pending initiated transactions not attempted since [start] */
    699 pub async fn pending_batch(
    700     db: &mut PgConnection,
    701     start: &Timestamp,
    702 ) -> sqlx::Result<Vec<Initiated>> {
    703     serialized!(
    704         sqlx::query(
    705             "
    706             SELECT initiated_id, amount, subject, credit_account, credit_name
    707             FROM initiated
    708             WHERE tx_id IS NULL
    709                 AND status='pending'
    710                 AND (last_submitted IS NULL OR last_submitted < $1)
    711             LIMIT 100
    712         ",
    713         )
    714         .bind_timestamp(start)
    715         .try_map(|r: PgRow| {
    716             Ok(Initiated {
    717                 id: r.try_get(0)?,
    718                 amount: r.try_get(1)?,
    719                 subject: r.try_get(2)?,
    720                 creditor_id: r.try_get(3)?,
    721                 creditor_name: r.try_get(4)?,
    722             })
    723         })
    724         .fetch_all(&mut *db)
    725     )
    726 }
    727 
    728 /** Update status of a successful submitted initiated transaction */
    729 pub async fn initiated_submit_success(
    730     db: &mut PgConnection,
    731     initiated_id: i64,
    732     timestamp: &Timestamp,
    733     tx_id: i64,
    734 ) -> sqlx::Result<()> {
    735     serialized!(
    736         sqlx::query(
    737             "
    738                 UPDATE initiated
    739                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    740                 WHERE initiated_id=$3
    741             "
    742         )
    743         .bind_timestamp(timestamp)
    744         .bind(tx_id)
    745         .bind(initiated_id)
    746         .execute(&mut *db)
    747     )?;
    748     Ok(())
    749 }
    750 
    751 /** Update status of a permanently failed initiated transaction */
    752 pub async fn initiated_submit_permanent_failure(
    753     db: &mut PgConnection,
    754     initiated_id: i64,
    755     msg: &str,
    756 ) -> sqlx::Result<()> {
    757     serialized!(
    758         sqlx::query(
    759             "
    760                 UPDATE initiated
    761                 SET status='permanent_failure', status_msg=$1
    762                 WHERE initiated_id=$2
    763             ",
    764         )
    765         .bind(msg)
    766         .bind(initiated_id)
    767         .execute(&mut *db)
    768     )?;
    769     Ok(())
    770 }
    771 
    772 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    773 pub enum ChargebackFailureResult {
    774     Unknown,
    775     Known(u64),
    776     Idempotent(u64),
    777 }
    778 
    779 /** Update status of a charged back initiated transaction */
    780 pub async fn initiated_chargeback_failure(
    781     db: &mut PgConnection,
    782     transfer_id: i64,
    783 ) -> sqlx::Result<ChargebackFailureResult> {
    784     Ok(serialized!(
    785         sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
    786             .bind(transfer_id)
    787             .try_map(|r: PgRow| {
    788                 let id = r.try_get_u64(0)?;
    789                 Ok(if id == 0 {
    790                     ChargebackFailureResult::Unknown
    791                 } else if r.try_get(1)? {
    792                     ChargebackFailureResult::Known(id)
    793                 } else {
    794                     ChargebackFailureResult::Idempotent(id)
    795                 })
    796             })
    797             .fetch_optional(&mut *db)
    798     )?
    799     .unwrap_or(ChargebackFailureResult::Unknown))
    800 }
    801 
    802 /** Get JSON value from KV table */
    803 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    804     db: &mut PgConnection,
    805     key: &str,
    806 ) -> sqlx::Result<Option<T>> {
    807     serialized!(
    808         sqlx::query("SELECT value FROM kv WHERE key=$1")
    809             .bind(key)
    810             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    811             .fetch_optional(&mut *db)
    812     )
    813 }
    814 
    815 /** Set JSON value in KV table */
    816 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    817     serialized!(
    818         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    819             .bind(key)
    820             .bind(sqlx::types::Json(value))
    821             .execute(&mut *db)
    822     )?;
    823     Ok(())
    824 }
    825 
    826 pub enum RegistrationResult {
    827     Success,
    828     ReservePubReuse,
    829 }
    830 
    831 pub async fn transfer_register(
    832     db: &PgPool,
    833     req: &RegistrationRequest,
    834 ) -> sqlx::Result<RegistrationResult> {
    835     let ty: IncomingType = req.r#type.into();
    836     serialized!(
    837         sqlx::query(
    838             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    839         )
    840         .bind(ty)
    841         .bind(&req.account_pub)
    842         .bind(&req.authorization_pub)
    843         .bind(&req.authorization_sig)
    844         .bind(req.recurrent)
    845         .bind_timestamp(&Timestamp::now())
    846         .try_map(|r: PgRow| {
    847             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    848                 RegistrationResult::ReservePubReuse
    849             } else {
    850                 RegistrationResult::Success
    851             })
    852         })
    853         .fetch_one(db)
    854     )
    855 }
    856 
    857 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    858     serialized!(
    859         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)")
    860             .bind(&req.authorization_pub)
    861             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    862             .fetch_one(db)
    863     )
    864 }
    865 
    866 pub trait CyclosTypeHelper {
    867     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    868         &self,
    869         idx: I,
    870         name: I,
    871         root: &CompactString,
    872     ) -> sqlx::Result<FullCyclosPayto>;
    873     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    874         &self,
    875         idx: I,
    876         name: I,
    877         root: &CompactString,
    878     ) -> sqlx::Result<PaytoURI>;
    879 }
    880 
    881 impl CyclosTypeHelper for PgRow {
    882     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    883         &self,
    884         idx: I,
    885         name: I,
    886         root: &CompactString,
    887     ) -> sqlx::Result<FullCyclosPayto> {
    888         let idx = self.try_get(idx)?;
    889         let name = self.try_get(name)?;
    890         Ok(FullCyclosPayto::new(
    891             CyclosAccount {
    892                 id: CyclosId(idx),
    893                 root: root.clone(),
    894             },
    895             name,
    896         ))
    897     }
    898     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    899         &self,
    900         idx: I,
    901         name: I,
    902         root: &CompactString,
    903     ) -> sqlx::Result<PaytoURI> {
    904         let idx = self.try_get(idx)?;
    905         let name = self.try_get(name)?;
    906         Ok(CyclosAccount {
    907             id: CyclosId(idx),
    908             root: root.clone(),
    909         }
    910         .as_full_uri(name))
    911     }
    912 }
    913 
    914 #[cfg(test)]
    915 mod test {
    916     use std::assert_matches;
    917 
    918     use compact_str::CompactString;
    919     use jiff::{Span, Timestamp};
    920     use serde_json::json;
    921     use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow};
    922     use taler_api::{
    923         db::TypeHelper,
    924         notification::dummy_listen,
    925         subject::{IncomingSubject, OutgoingSubject},
    926     };
    927     use taler_common::{
    928         api::{
    929             EddsaPublicKey, HashCode, ShortHashCode,
    930             params::{History, Page},
    931             wire::TransferState,
    932         },
    933         types::{
    934             amount::{Currency, decimal},
    935             url,
    936             utils::now_sql_stable_ts,
    937         },
    938     };
    939 
    940     use crate::{
    941         constants::CONFIG_SOURCE,
    942         db::{
    943             self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
    944             Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
    945         },
    946     };
    947 
    948     pub const CURR: Currency = Currency::TEST;
    949     pub const ROOT: CompactString = CompactString::const_new("localhost");
    950 
    951     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    952         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    953     }
    954 
    955     #[tokio::test]
    956     async fn kv() {
    957         let (mut db, _) = setup().await;
    958 
    959         let value = json!({
    960             "name": "Mr Smith",
    961             "no way": 32
    962         });
    963 
    964         assert_eq!(
    965             db::kv_get::<serde_json::Value>(&mut db, "value")
    966                 .await
    967                 .unwrap(),
    968             None
    969         );
    970         db::kv_set(&mut db, "value", &value).await.unwrap();
    971         db::kv_set(&mut db, "value", &value).await.unwrap();
    972         assert_eq!(
    973             db::kv_get::<serde_json::Value>(&mut db, "value")
    974                 .await
    975                 .unwrap(),
    976             Some(value)
    977         );
    978     }
    979 
    980     #[tokio::test]
    981     async fn tx_in() {
    982         let (mut db, pool) = setup().await;
    983 
    984         let mut routine = async |first: &Option<IncomingSubject>,
    985                                  second: &Option<IncomingSubject>| {
    986             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    987                 .try_map(|r: PgRow| r.try_get_u64(0))
    988                 .fetch_one(&mut *db)
    989                 .await
    990                 .unwrap();
    991             let now = now_sql_stable_ts();
    992             let later = now + Span::new().hours(2);
    993             let tx = TxIn {
    994                 transfer_id: now.as_microsecond() as i64,
    995                 tx_id: None,
    996                 amount: decimal("10"),
    997                 subject: "subject".to_owned(),
    998                 debtor_id: 31000163100000000,
    999                 debtor_name: "Name".into(),
   1000                 valued_at: now,
   1001             };
   1002             // Insert
   1003             assert_eq!(
   1004                 db::register_tx_in(&mut db, &tx, first, &now)
   1005                     .await
   1006                     .expect("register tx in"),
   1007                 AddIncomingResult::Success {
   1008                     new: true,
   1009                     pending: false,
   1010                     row_id: id,
   1011                     valued_at: now,
   1012                 }
   1013             );
   1014             // Idempotent
   1015             assert_eq!(
   1016                 db::register_tx_in(
   1017                     &mut db,
   1018                     &TxIn {
   1019                         valued_at: later,
   1020                         ..tx.clone()
   1021                     },
   1022                     first,
   1023                     &now
   1024                 )
   1025                 .await
   1026                 .expect("register tx in"),
   1027                 AddIncomingResult::Success {
   1028                     new: false,
   1029                     pending: false,
   1030                     row_id: id,
   1031                     valued_at: now
   1032                 }
   1033             );
   1034             // Many
   1035             assert_eq!(
   1036                 db::register_tx_in(
   1037                     &mut db,
   1038                     &TxIn {
   1039                         transfer_id: later.as_microsecond() as i64,
   1040                         valued_at: later,
   1041                         ..tx
   1042                     },
   1043                     second,
   1044                     &now
   1045                 )
   1046                 .await
   1047                 .expect("register tx in"),
   1048                 AddIncomingResult::Success {
   1049                     new: true,
   1050                     pending: false,
   1051                     row_id: id + 1,
   1052                     valued_at: later
   1053                 }
   1054             );
   1055         };
   1056 
   1057         // Empty db
   1058         assert_eq!(
   1059             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1060                 .await
   1061                 .unwrap(),
   1062             Vec::new()
   1063         );
   1064         assert_eq!(
   1065             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1066                 .await
   1067                 .unwrap(),
   1068             Vec::new()
   1069         );
   1070 
   1071         // Regular transaction
   1072         routine(&None, &None).await;
   1073 
   1074         // Reserve transaction
   1075         routine(
   1076             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1077             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1078         )
   1079         .await;
   1080 
   1081         // Kyc transaction
   1082         routine(
   1083             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1084             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1085         )
   1086         .await;
   1087 
   1088         // History
   1089         assert_eq!(
   1090             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1091                 .await
   1092                 .unwrap()
   1093                 .len(),
   1094             6
   1095         );
   1096         assert_eq!(
   1097             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1098                 .await
   1099                 .unwrap()
   1100                 .len(),
   1101             4
   1102         );
   1103     }
   1104 
   1105     #[tokio::test]
   1106     async fn tx_in_admin() {
   1107         let (_, pool) = setup().await;
   1108 
   1109         // Empty db
   1110         assert_eq!(
   1111             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1112                 .await
   1113                 .unwrap(),
   1114             Vec::new()
   1115         );
   1116 
   1117         let now = now_sql_stable_ts();
   1118         let later = now + Span::new().hours(2);
   1119         let tx = TxInAdmin {
   1120             amount: decimal("10"),
   1121             subject: "subject".to_owned(),
   1122             debtor_id: 31000163100000000,
   1123             debtor_name: "Name".into(),
   1124             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1125         };
   1126         // Insert
   1127         assert_eq!(
   1128             db::register_tx_in_admin(&pool, &tx, &now)
   1129                 .await
   1130                 .expect("register tx in"),
   1131             AddIncomingResult::Success {
   1132                 new: true,
   1133                 pending: false,
   1134                 row_id: 1,
   1135                 valued_at: now
   1136             }
   1137         );
   1138         // Many
   1139         assert_eq!(
   1140             db::register_tx_in_admin(
   1141                 &pool,
   1142                 &TxInAdmin {
   1143                     subject: "Other".to_owned(),
   1144                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1145                     ..tx.clone()
   1146                 },
   1147                 &later
   1148             )
   1149             .await
   1150             .expect("register tx in"),
   1151             AddIncomingResult::Success {
   1152                 new: true,
   1153                 pending: false,
   1154                 row_id: 2,
   1155                 valued_at: later
   1156             }
   1157         );
   1158 
   1159         // History
   1160         assert_eq!(
   1161             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1162                 .await
   1163                 .unwrap()
   1164                 .len(),
   1165             2
   1166         );
   1167     }
   1168 
   1169     #[tokio::test]
   1170     async fn tx_out() {
   1171         let (mut db, pool) = setup().await;
   1172 
   1173         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1174             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1175                 .try_map(|r: PgRow| r.try_get(0))
   1176                 .fetch_one(&mut *db)
   1177                 .await
   1178                 .unwrap();
   1179             let now = now_sql_stable_ts();
   1180             let later = now + Span::new().hours(2);
   1181             let tx = TxOut {
   1182                 transfer_id,
   1183                 tx_id: Some(transfer_id),
   1184                 amount: decimal("10"),
   1185                 subject: "subject".to_owned(),
   1186                 creditor_id: 31000163100000000,
   1187                 creditor_name: "Name".into(),
   1188                 valued_at: now,
   1189             };
   1190             assert_matches!(
   1191                 db::make_transfer(
   1192                     &pool,
   1193                     &Transfer {
   1194                         request_uid: HashCode::rand(),
   1195                         amount: decimal("10"),
   1196                         exchange_base_url: url("https://exchange.test.com/"),
   1197                         metadata: None,
   1198                         wtid: ShortHashCode::rand(),
   1199                         creditor_id: 31000163100000000,
   1200                         creditor_name: "Name".into()
   1201                     },
   1202                     &now
   1203                 )
   1204                 .await
   1205                 .unwrap(),
   1206                 TransferResult::Success { .. }
   1207             );
   1208             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id)
   1209                 .await
   1210                 .expect("status success");
   1211 
   1212             // Insert
   1213             assert_eq!(
   1214                 db::register_tx_out(&mut db, &tx, first, &now)
   1215                     .await
   1216                     .expect("register tx out"),
   1217                 AddOutgoingResult {
   1218                     result: db::RegisterResult::known,
   1219                     row_id: transfer_id,
   1220                 }
   1221             );
   1222             // Idempotent
   1223             assert_eq!(
   1224                 db::register_tx_out(
   1225                     &mut db,
   1226                     &TxOut {
   1227                         valued_at: later,
   1228                         ..tx.clone()
   1229                     },
   1230                     first,
   1231                     &now
   1232                 )
   1233                 .await
   1234                 .expect("register tx out"),
   1235                 AddOutgoingResult {
   1236                     result: db::RegisterResult::idempotent,
   1237                     row_id: transfer_id,
   1238                 }
   1239             );
   1240             // Recovered
   1241             assert_eq!(
   1242                 db::register_tx_out(
   1243                     &mut db,
   1244                     &TxOut {
   1245                         transfer_id: transfer_id + 1,
   1246                         tx_id: Some(transfer_id + 1),
   1247                         valued_at: later,
   1248                         ..tx.clone()
   1249                     },
   1250                     second,
   1251                     &now
   1252                 )
   1253                 .await
   1254                 .expect("register tx out"),
   1255                 AddOutgoingResult {
   1256                     result: db::RegisterResult::recovered,
   1257                     row_id: transfer_id + 1,
   1258                 }
   1259             );
   1260         };
   1261 
   1262         // Empty db
   1263         assert_eq!(
   1264             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1265                 .await
   1266                 .unwrap(),
   1267             Vec::new()
   1268         );
   1269 
   1270         // Regular transaction
   1271         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1272 
   1273         // Talerable transaction
   1274         routine(
   1275             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1276             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1277         )
   1278         .await;
   1279 
   1280         // Bounced transaction
   1281         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1282 
   1283         // History
   1284         assert_eq!(
   1285             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1286                 .await
   1287                 .unwrap()
   1288                 .len(),
   1289             2
   1290         );
   1291     }
   1292 
   1293     // TODO tx out failure
   1294 
   1295     #[tokio::test]
   1296     async fn transfer() {
   1297         let (_, pool) = setup().await;
   1298 
   1299         // Empty db
   1300         assert_eq!(
   1301             db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(),
   1302             None
   1303         );
   1304         assert_eq!(
   1305             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1306                 .await
   1307                 .unwrap(),
   1308             Vec::new()
   1309         );
   1310 
   1311         let req = Transfer {
   1312             request_uid: HashCode::rand(),
   1313             amount: decimal("10"),
   1314             exchange_base_url: url("https://exchange.test.com/"),
   1315             metadata: None,
   1316             wtid: ShortHashCode::rand(),
   1317             creditor_id: 31000163100000000,
   1318             creditor_name: "Name".into(),
   1319         };
   1320         let now = now_sql_stable_ts();
   1321         let later = now + Span::new().hours(2);
   1322         // Insert
   1323         assert_eq!(
   1324             db::make_transfer(&pool, &req, &now)
   1325                 .await
   1326                 .expect("transfer"),
   1327             TransferResult::Success {
   1328                 id: 1,
   1329                 initiated_at: now
   1330             }
   1331         );
   1332         // Idempotent
   1333         assert_eq!(
   1334             db::make_transfer(&pool, &req, &later)
   1335                 .await
   1336                 .expect("transfer"),
   1337             TransferResult::Success {
   1338                 id: 1,
   1339                 initiated_at: now
   1340             }
   1341         );
   1342         // Request UID reuse
   1343         assert_eq!(
   1344             db::make_transfer(
   1345                 &pool,
   1346                 &Transfer {
   1347                     wtid: ShortHashCode::rand(),
   1348                     ..req.clone()
   1349                 },
   1350                 &now
   1351             )
   1352             .await
   1353             .expect("transfer"),
   1354             TransferResult::RequestUidReuse
   1355         );
   1356         // wtid reuse
   1357         assert_eq!(
   1358             db::make_transfer(
   1359                 &pool,
   1360                 &Transfer {
   1361                     request_uid: HashCode::rand(),
   1362                     ..req.clone()
   1363                 },
   1364                 &now
   1365             )
   1366             .await
   1367             .expect("transfer"),
   1368             TransferResult::WtidReuse
   1369         );
   1370         // Many
   1371         assert_eq!(
   1372             db::make_transfer(
   1373                 &pool,
   1374                 &Transfer {
   1375                     request_uid: HashCode::rand(),
   1376                     wtid: ShortHashCode::rand(),
   1377                     ..req
   1378                 },
   1379                 &later
   1380             )
   1381             .await
   1382             .expect("transfer"),
   1383             TransferResult::Success {
   1384                 id: 2,
   1385                 initiated_at: later
   1386             }
   1387         );
   1388 
   1389         // Get
   1390         assert!(
   1391             db::transfer_by_id(&pool, 1, &CURR, &ROOT)
   1392                 .await
   1393                 .unwrap()
   1394                 .is_some()
   1395         );
   1396         assert!(
   1397             db::transfer_by_id(&pool, 2, &CURR, &ROOT)
   1398                 .await
   1399                 .unwrap()
   1400                 .is_some()
   1401         );
   1402         assert!(
   1403             db::transfer_by_id(&pool, 3, &CURR, &ROOT)
   1404                 .await
   1405                 .unwrap()
   1406                 .is_none()
   1407         );
   1408         assert_eq!(
   1409             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1410                 .await
   1411                 .unwrap()
   1412                 .len(),
   1413             2
   1414         );
   1415     }
   1416 
   1417     #[tokio::test]
   1418     async fn bounce() {
   1419         let (mut db, _) = setup().await;
   1420 
   1421         let amount = decimal("10");
   1422         let now = now_sql_stable_ts();
   1423 
   1424         // Bounce
   1425         assert_eq!(
   1426             db::register_bounced_tx_in(
   1427                 &mut db,
   1428                 &TxIn {
   1429                     transfer_id: 12,
   1430                     tx_id: None,
   1431                     amount,
   1432                     subject: "subject".to_owned(),
   1433                     debtor_id: 31000163100000000,
   1434                     debtor_name: "Name".into(),
   1435                     valued_at: now
   1436                 },
   1437                 22,
   1438                 "good reason",
   1439                 &now
   1440             )
   1441             .await
   1442             .expect("bounce"),
   1443             BounceResult {
   1444                 tx_id: 1,
   1445                 tx_new: true
   1446             }
   1447         );
   1448         // Idempotent
   1449         assert_eq!(
   1450             db::register_bounced_tx_in(
   1451                 &mut db,
   1452                 &TxIn {
   1453                     transfer_id: 12,
   1454                     tx_id: None,
   1455                     amount,
   1456                     subject: "subject".to_owned(),
   1457                     debtor_id: 31000163100000000,
   1458                     debtor_name: "Name".into(),
   1459                     valued_at: now
   1460                 },
   1461                 22,
   1462                 "good reason",
   1463                 &now
   1464             )
   1465             .await
   1466             .expect("bounce"),
   1467             BounceResult {
   1468                 tx_id: 1,
   1469                 tx_new: false
   1470             }
   1471         );
   1472 
   1473         // Many
   1474         assert_eq!(
   1475             db::register_bounced_tx_in(
   1476                 &mut db,
   1477                 &TxIn {
   1478                     transfer_id: 13,
   1479                     tx_id: None,
   1480                     amount,
   1481                     subject: "subject".to_owned(),
   1482                     debtor_id: 31000163100000000,
   1483                     debtor_name: "Name".into(),
   1484                     valued_at: now
   1485                 },
   1486                 23,
   1487                 "good reason",
   1488                 &now
   1489             )
   1490             .await
   1491             .expect("bounce"),
   1492             BounceResult {
   1493                 tx_id: 2,
   1494                 tx_new: true
   1495             }
   1496         );
   1497     }
   1498 
   1499     #[tokio::test]
   1500     async fn status() {
   1501         let (mut db, pool) = setup().await;
   1502 
   1503         let check_status = async |id: u64, status: TransferState, msg: Option<&str>| {
   1504             let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT)
   1505                 .await
   1506                 .unwrap()
   1507                 .unwrap();
   1508             assert_eq!(
   1509                 (status, msg),
   1510                 (transfer.status, transfer.status_msg.as_deref())
   1511             );
   1512         };
   1513 
   1514         // Unknown transfer
   1515         db::initiated_submit_permanent_failure(&mut db, 1, "msg")
   1516             .await
   1517             .unwrap();
   1518         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1519             .await
   1520             .unwrap();
   1521         assert_eq!(
   1522             db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
   1523             ChargebackFailureResult::Unknown
   1524         );
   1525 
   1526         // Failure
   1527         db::make_transfer(
   1528             &pool,
   1529             &Transfer {
   1530                 request_uid: HashCode::rand(),
   1531                 amount: decimal("1"),
   1532                 exchange_base_url: url("https://exchange.test.com/"),
   1533                 metadata: None,
   1534                 wtid: ShortHashCode::rand(),
   1535                 creditor_id: 31000163100000000,
   1536                 creditor_name: "Name".into(),
   1537             },
   1538             &Timestamp::now(),
   1539         )
   1540         .await
   1541         .expect("transfer");
   1542         check_status(1, TransferState::pending, None).await;
   1543         db::initiated_submit_permanent_failure(&mut db, 1, "error status")
   1544             .await
   1545             .unwrap();
   1546         check_status(1, TransferState::permanent_failure, Some("error status")).await;
   1547 
   1548         // Success
   1549         db::make_transfer(
   1550             &pool,
   1551             &Transfer {
   1552                 request_uid: HashCode::rand(),
   1553                 amount: decimal("1"),
   1554                 exchange_base_url: url("https://exchange.test.com/"),
   1555                 metadata: None,
   1556                 wtid: ShortHashCode::rand(),
   1557                 creditor_id: 31000163100000000,
   1558                 creditor_name: "Name".into(),
   1559             },
   1560             &Timestamp::now(),
   1561         )
   1562         .await
   1563         .expect("transfer");
   1564         check_status(2, TransferState::pending, None).await;
   1565         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1566             .await
   1567             .unwrap();
   1568         check_status(2, TransferState::pending, None).await;
   1569         db::register_tx_out(
   1570             &mut db,
   1571             &TxOut {
   1572                 transfer_id: 5,
   1573                 tx_id: Some(3),
   1574                 amount: decimal("2"),
   1575                 subject: "".to_string(),
   1576                 creditor_id: 31000163100000000,
   1577                 creditor_name: "Name".into(),
   1578                 valued_at: Timestamp::now(),
   1579             },
   1580             &TxOutKind::Simple,
   1581             &Timestamp::now(),
   1582         )
   1583         .await
   1584         .unwrap();
   1585         check_status(2, TransferState::success, None).await;
   1586 
   1587         // Chargeback
   1588         assert_eq!(
   1589             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1590             ChargebackFailureResult::Known(2)
   1591         );
   1592         check_status(2, TransferState::late_failure, Some("charged back")).await;
   1593         assert_eq!(
   1594             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1595             ChargebackFailureResult::Idempotent(2)
   1596         );
   1597     }
   1598 
   1599     #[tokio::test]
   1600     async fn batch() {
   1601         let (mut db, pool) = setup().await;
   1602         let start = Timestamp::now();
   1603 
   1604         // Empty db
   1605         let pendings = db::pending_batch(&mut db, &start)
   1606             .await
   1607             .expect("pending_batch");
   1608         assert_eq!(pendings.len(), 0);
   1609 
   1610         // Some transfers
   1611         for i in 0..3 {
   1612             db::make_transfer(
   1613                 &pool,
   1614                 &Transfer {
   1615                     request_uid: HashCode::rand(),
   1616                     amount: decimal(format!("{}", i + 1)),
   1617                     exchange_base_url: url("https://exchange.test.com/"),
   1618                     metadata: None,
   1619                     wtid: ShortHashCode::rand(),
   1620                     creditor_id: 31000163100000000,
   1621                     creditor_name: "Name".into(),
   1622                 },
   1623                 &Timestamp::now(),
   1624             )
   1625             .await
   1626             .expect("transfer");
   1627         }
   1628         let pendings = db::pending_batch(&mut db, &start)
   1629             .await
   1630             .expect("pending_batch");
   1631         assert_eq!(pendings.len(), 3);
   1632 
   1633         // Max 100 txs in batch
   1634         for i in 0..100 {
   1635             db::make_transfer(
   1636                 &pool,
   1637                 &Transfer {
   1638                     request_uid: HashCode::rand(),
   1639                     amount: decimal(format!("{}", i + 1)),
   1640                     exchange_base_url: url("https://exchange.test.com/"),
   1641                     metadata: None,
   1642                     wtid: ShortHashCode::rand(),
   1643                     creditor_id: 31000163100000000,
   1644                     creditor_name: "Name".into(),
   1645                 },
   1646                 &Timestamp::now(),
   1647             )
   1648             .await
   1649             .expect("transfer");
   1650         }
   1651         let pendings = db::pending_batch(&mut db, &start)
   1652             .await
   1653             .expect("pending_batch");
   1654         assert_eq!(pendings.len(), 100);
   1655 
   1656         // Skip uploaded
   1657         for i in 0..=10 {
   1658             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1659                 .await
   1660                 .expect("status success");
   1661         }
   1662         let pendings = db::pending_batch(&mut db, &start)
   1663             .await
   1664             .expect("pending_batch");
   1665         assert_eq!(pendings.len(), 93);
   1666 
   1667         // Skip failed
   1668         for i in 0..=10 {
   1669             db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
   1670                 .await
   1671                 .expect("status failure");
   1672         }
   1673         let pendings = db::pending_batch(&mut db, &start)
   1674             .await
   1675             .expect("pending_batch");
   1676         assert_eq!(pendings.len(), 83);
   1677     }
   1678 }