taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

magnet-bank-harness.rs (18067B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{fmt::Debug, time::Duration};
     18 
     19 use aws_lc_rs::signature::EcdsaKeyPair;
     20 use clap::Parser as _;
     21 use failure_injection::{InjectedErr, set_failure_scenario};
     22 use jiff::{Timestamp, Zoned};
     23 use owo_colors::OwoColorize;
     24 use sqlx::PgPool;
     25 use taler_api::notification::dummy_listen;
     26 use taler_build::long_version;
     27 use taler_common::{
     28     CommonArgs,
     29     api_common::{EddsaPublicKey, HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_wire::{IncomingBankTransaction, TransferState},
     32     config::Config,
     33     db::{dbinit, pool},
     34     taler_main,
     35     types::{amount::decimal, url},
     36 };
     37 use taler_magnet_bank::{
     38     FullHuPayto, HuIban,
     39     config::{AccountType, HarnessCfg, parse_db_cfg},
     40     constants::CONFIG_SOURCE,
     41     db::{self, TransferResult},
     42     magnet_api::{
     43         client::{ApiClient, AuthClient},
     44         types::{Account, Direction, Order, TxDto, TxStatus},
     45     },
     46     setup::{self, Keys},
     47     worker::{Worker, WorkerError, WorkerResult, run_worker},
     48 };
     49 
     50 // TODO macro for retry/expect logic
     51 
     52 /// Taler Magnet Bank Adapter harness test suite
     53 #[derive(clap::Parser, Debug)]
     54 #[command(long_version = long_version(), about, long_about = None)]
     55 struct Args {
     56     #[clap(flatten)]
     57     common: CommonArgs,
     58 
     59     #[command(subcommand)]
     60     cmd: Command,
     61 }
     62 
     63 #[derive(clap::Subcommand, Debug)]
     64 enum Command {
     65     /// Run logic tests
     66     Logic {
     67         #[clap(long, short)]
     68         reset: bool,
     69     },
     70     /// Run online tests
     71     Online {
     72         #[clap(long, short)]
     73         reset: bool,
     74     },
     75 }
     76 
     77 /// Custom client for harness actions
     78 struct Harness<'a> {
     79     cfg: &'a HarnessCfg,
     80     pool: &'a PgPool,
     81     api: ApiClient<'a>,
     82     exchange: Account,
     83     client: Account,
     84     signing_key: &'a EcdsaKeyPair,
     85 }
     86 
     87 impl<'a> Harness<'a> {
     88     async fn new(
     89         cfg: &'a HarnessCfg,
     90         client: &'a http_client::Client,
     91         pool: &'a PgPool,
     92         keys: &'a Keys,
     93     ) -> Self {
     94         let api = AuthClient::new(client, &cfg.worker.api_url, &cfg.worker.consumer)
     95             .upgrade(&keys.access_token);
     96         let (exchange, client) = tokio::try_join!(
     97             api.account(cfg.worker.payto.bban()),
     98             api.account(cfg.client_payto.bban())
     99         )
    100         .unwrap();
    101         Self {
    102             cfg,
    103             pool,
    104             api,
    105             exchange,
    106             client,
    107             signing_key: &keys.signing_key,
    108         }
    109     }
    110 
    111     async fn worker(&'a self) -> WorkerResult {
    112         let db = &mut self.pool.acquire().await.unwrap().detach();
    113         Worker {
    114             client: &self.api,
    115             db,
    116             account_number: &self.exchange.number,
    117             account_code: self.exchange.code,
    118             key: self.signing_key,
    119             account_type: AccountType::Exchange,
    120             ignore_tx_before: self.cfg.worker.ignore_tx_before,
    121             ignore_bounces_before: self.cfg.worker.ignore_bounces_before,
    122         }
    123         .run()
    124         .await
    125     }
    126 
    127     async fn balance(&self) -> (u32, u32) {
    128         let (exchange_balance, client_balance) = tokio::try_join!(
    129             self.api.balance_mini(self.exchange.iban.bban()),
    130             self.api.balance_mini(self.client.iban.bban())
    131         )
    132         .unwrap();
    133         (
    134             exchange_balance.balance as u32,
    135             client_balance.balance as u32,
    136         )
    137     }
    138 
    139     async fn custom_transfer(&self, forint: u32, creditor: FullHuPayto) -> u64 {
    140         let res = db::make_transfer(
    141             self.pool,
    142             &db::Transfer {
    143                 request_uid: HashCode::rand(),
    144                 amount: decimal(format!("{forint}")),
    145                 exchange_base_url: url("https://test.com"),
    146                 metadata: None,
    147                 wtid: ShortHashCode::rand(),
    148                 creditor,
    149             },
    150             &Timestamp::now(),
    151         )
    152         .await
    153         .unwrap();
    154         match res {
    155             TransferResult::Success { id, .. } => id,
    156             TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(),
    157         }
    158     }
    159 
    160     async fn transfer(&self, forint: u32) -> u64 {
    161         self.custom_transfer(forint, FullHuPayto::new(self.client.iban.clone(), "Name"))
    162             .await
    163     }
    164 
    165     async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
    166         let mut attempts = 0;
    167         loop {
    168             let transfer = db::transfer_by_id(self.pool, id).await.unwrap().unwrap();
    169             if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
    170                 return;
    171             }
    172             if attempts > 40 {
    173                 assert_eq!(
    174                     (transfer.status, transfer.status_msg.as_deref()),
    175                     (status, msg)
    176                 );
    177             }
    178             attempts += 1;
    179             tokio::time::sleep(Duration::from_millis(200)).await;
    180         }
    181     }
    182 
    183     async fn expect_incoming(&self, key: EddsaPublicKey) {
    184         let transfer = db::incoming_history(
    185             self.pool,
    186             &History {
    187                 page: Page {
    188                     limit: -1,
    189                     offset: None,
    190                 },
    191                 timeout_ms: None,
    192             },
    193             dummy_listen,
    194         )
    195         .await
    196         .unwrap();
    197         assert!(matches!(
    198             transfer.first().unwrap(),
    199             IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key,
    200         ));
    201     }
    202 
    203     /// Send a transaction between two magnet accounts
    204     async fn send_tx(&self, from: &Account, to: &HuIban, subject: &str, amount: u32) -> u64 {
    205         let now = Zoned::now();
    206         let info = self
    207             .api
    208             .init_tx(
    209                 from.code,
    210                 amount as f64,
    211                 subject,
    212                 &now.date(),
    213                 "Name",
    214                 to.bban(),
    215             )
    216             .await
    217             .unwrap();
    218         self.api
    219             .submit_tx(
    220                 self.signing_key,
    221                 &from.number,
    222                 info.code,
    223                 info.amount,
    224                 &now.date(),
    225                 to.bban(),
    226             )
    227             .await
    228             .unwrap();
    229         info.code
    230     }
    231 
    232     async fn latest_tx(&self, account: &Account) -> TxDto {
    233         self.api
    234             .page_tx(
    235                 Direction::Both,
    236                 Order::Descending,
    237                 1,
    238                 account.iban.bban(),
    239                 &None,
    240                 true,
    241             )
    242             .await
    243             .unwrap()
    244             .list
    245             .pop()
    246             .unwrap()
    247             .tx
    248     }
    249 
    250     /// Send transaction from client to exchange
    251     async fn client_send(&self, subject: &str, amount: u32) -> u64 {
    252         self.send_tx(&self.client, &self.exchange.iban, subject, amount)
    253             .await
    254     }
    255 
    256     /// Send transaction from exchange to client
    257     async fn exchange_send_to(&self, subject: &str, amount: u32, to: &HuIban) -> u64 {
    258         self.send_tx(&self.exchange, to, subject, amount).await
    259     }
    260 
    261     /// Send transaction from exchange to client
    262     async fn exchange_send(&self, subject: &str, amount: u32) -> u64 {
    263         self.exchange_send_to(subject, amount, &self.client.iban)
    264             .await
    265     }
    266 
    267     async fn expect_status(&self, code: u64, status: TxStatus) {
    268         let mut attempts = 0;
    269         loop {
    270             let current = self.api.get_tx(code).await.unwrap().status;
    271             if current == status {
    272                 return;
    273             }
    274             if attempts > 40 {
    275                 assert_eq!(current, status, "{code}");
    276             }
    277             attempts += 1;
    278             tokio::time::sleep(Duration::from_millis(200)).await;
    279         }
    280     }
    281 }
    282 
    283 struct Balances<'a> {
    284     client: &'a Harness<'a>,
    285     exchange_balance: u32,
    286     client_balance: u32,
    287 }
    288 
    289 impl<'a> Balances<'a> {
    290     pub async fn new(client: &'a Harness<'a>) -> Self {
    291         let (exchange_balance, client_balance) = client.balance().await;
    292         Self {
    293             client,
    294             exchange_balance,
    295             client_balance,
    296         }
    297     }
    298 
    299     async fn expect(&mut self, diff: i32) {
    300         self.exchange_balance = (self.exchange_balance as i32 + diff) as u32;
    301         self.client_balance = (self.client_balance as i32 - diff) as u32;
    302         let mut attempts = 0;
    303         loop {
    304             let current = self.client.balance().await;
    305             if current == (self.exchange_balance, self.client_balance) {
    306                 return;
    307             }
    308             if attempts > 40 {
    309                 assert_eq!(
    310                     current,
    311                     (self.exchange_balance, self.client_balance),
    312                     "{current:?} {diff}"
    313                 );
    314             }
    315             attempts += 1;
    316             tokio::time::sleep(Duration::from_millis(200)).await;
    317         }
    318     }
    319 }
    320 
    321 fn step(step: &str) {
    322     println!("{}", step.green());
    323 }
    324 
    325 /// Run logic tests against local Magnet Bank backend
    326 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
    327     step("Run Magnet Bank logic harness tests");
    328 
    329     step("Prepare db");
    330     let db_cfg = parse_db_cfg(cfg)?;
    331     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    332     let mut db = pool.acquire().await?.detach();
    333     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    334 
    335     let cfg = HarnessCfg::parse(cfg)?;
    336     let keys = setup::load(&cfg.worker)?;
    337     let client = http_client::client()?;
    338 
    339     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    340 
    341     step("Warmup");
    342     harness.worker().await?;
    343     tokio::time::sleep(Duration::from_secs(5)).await;
    344     harness.worker().await?;
    345 
    346     let unknown_account =
    347         FullHuPayto::new(HuIban::from_bban("1620000310991642").unwrap(), "Unknown");
    348     let now = Timestamp::now();
    349     let balance = &mut Balances::new(&harness).await;
    350 
    351     step("Test incoming talerable transaction");
    352     // Send talerable transaction
    353     let reserve_pub = EddsaPublicKey::rand();
    354     harness
    355         .client_send(&format!("Taler {reserve_pub}"), 33)
    356         .await;
    357     // Wait for transaction to finalize
    358     balance.expect(33).await;
    359     // Sync and register
    360     harness.worker().await?;
    361     harness.expect_incoming(reserve_pub).await;
    362 
    363     step("Test incoming malformed transaction");
    364     // Send malformed transaction
    365     harness
    366         .client_send(&format!("Malformed test {now}"), 34)
    367         .await;
    368     // Wait for transaction to finalize
    369     balance.expect(34).await;
    370     // Sync and bounce
    371     harness.worker().await?;
    372     // Wait for bounce to finalize
    373     balance.expect(-34).await;
    374     harness.worker().await?;
    375 
    376     step("Test transfer transactions");
    377     // Init a transfer to client
    378     let transfer_id = harness
    379         .custom_transfer(102, FullHuPayto::new(harness.client.iban.clone(), "Client"))
    380         .await;
    381     // Should send
    382     harness.worker().await?;
    383     // Check transfer is still pending
    384     harness
    385         .expect_transfer_status(transfer_id, TransferState::pending, None)
    386         .await;
    387     // Wait for transaction to finalize
    388     balance.expect(-102).await;
    389     // Should register
    390     harness.worker().await?;
    391     // Check transfer is now successful
    392     harness
    393         .expect_transfer_status(transfer_id, TransferState::success, None)
    394         .await;
    395 
    396     step("Test transfer to self");
    397     // Init a transfer to self
    398     let transfer_id = harness
    399         .custom_transfer(101, FullHuPayto::new(harness.exchange.iban.clone(), "Self"))
    400         .await;
    401     // Should failed
    402     harness.worker().await?;
    403     // Check transfer failed
    404     harness
    405         .expect_transfer_status(
    406             transfer_id,
    407             TransferState::permanent_failure,
    408             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    409         )
    410         .await;
    411 
    412     step("Test transfer to unknown account");
    413     let transfer_id = harness.custom_transfer(103, unknown_account.clone()).await;
    414     harness.worker().await?;
    415     harness
    416         .expect_transfer_status(transfer_id, TransferState::pending, None)
    417         .await;
    418     balance.expect(0).await;
    419     harness.worker().await?;
    420     harness
    421         .expect_transfer_status(transfer_id, TransferState::permanent_failure, None)
    422         .await;
    423 
    424     step("Test unexpected outgoing");
    425     // Manual tx from the exchange
    426     harness
    427         .exchange_send(&format!("What is this ? {now}"), 4)
    428         .await;
    429     harness.worker().await?;
    430     // Wait for transaction to finalize
    431     balance.expect(-4).await;
    432     harness.worker().await?;
    433 
    434     step("Test transfer failure init-tx");
    435     harness.transfer(10).await;
    436     set_failure_scenario(&["init-tx"]);
    437     assert!(matches!(
    438         harness.worker().await,
    439         Err(WorkerError::Injected(InjectedErr("init-tx")))
    440     ));
    441     harness.worker().await?;
    442     balance.expect(-10).await;
    443     harness.worker().await?;
    444 
    445     step("Test transfer failure submit-tx");
    446     harness.transfer(11).await;
    447     set_failure_scenario(&["submit-tx"]);
    448     assert!(matches!(
    449         harness.worker().await,
    450         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    451     ));
    452     harness.worker().await?;
    453     balance.expect(-11).await;
    454     harness.worker().await?;
    455 
    456     step("Test transfer all failures");
    457     harness.transfer(13).await;
    458     set_failure_scenario(&["init-tx", "submit-tx"]);
    459     assert!(matches!(
    460         harness.worker().await,
    461         Err(WorkerError::Injected(InjectedErr("init-tx")))
    462     ));
    463     assert!(matches!(
    464         harness.worker().await,
    465         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    466     ));
    467     harness.worker().await?;
    468     balance.expect(-13).await;
    469     harness.worker().await?;
    470 
    471     step("Test recover successful bounces");
    472     let code = harness
    473         .client_send(&format!("will be bounced {now}"), 2)
    474         .await;
    475     balance.expect(2).await;
    476     harness
    477         .exchange_send(&format!("bounced: {}", code + 1), 2)
    478         .await;
    479     balance.expect(-2).await;
    480     harness.worker().await?;
    481 
    482     step("Test recover failed bounces");
    483     // Send malformed transaction
    484     harness
    485         .client_send(&format!("will be failed bounced {now}"), 3)
    486         .await;
    487     // Wait for it to be received because rejected transaction take too much time to appear in the transactions log
    488     balance.expect(3).await;
    489     // Bounce it manually
    490     let received = harness.latest_tx(&harness.exchange).await;
    491     let bounce_code = harness
    492         .exchange_send_to(
    493             &format!("bounce manually: {}", received.code),
    494             3,
    495             &unknown_account,
    496         )
    497         .await;
    498     harness.expect_status(bounce_code, TxStatus::Rejected).await;
    499     // Should not bounce and catch the failure
    500     harness.worker().await?;
    501     // Wait for it to be bounce regardless because rejected transaction take too much time to appear in the transactions log
    502     // TODO fix this
    503     balance.expect(-3).await;
    504 
    505     step("Finish");
    506     tokio::time::sleep(Duration::from_secs(5)).await;
    507     harness.worker().await?;
    508     balance.expect(0).await;
    509     Ok(())
    510 }
    511 
    512 /// Run online tests against real Magnet Bank backend
    513 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
    514     step("Run Magnet Bank online harness tests");
    515 
    516     step("Prepare db");
    517     let db_cfg = parse_db_cfg(config)?;
    518     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    519     let mut db = pool.acquire().await?.detach();
    520     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    521 
    522     let cfg = HarnessCfg::parse(config)?;
    523     let keys = setup::load(&cfg.worker)?;
    524     let client = http_client::client()?;
    525 
    526     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    527 
    528     step("Warmup worker");
    529     let _worker_task = {
    530         let client = client.clone();
    531         let pool = pool.clone();
    532         let config = config.clone();
    533         tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
    534     };
    535     tokio::time::sleep(Duration::from_secs(25)).await;
    536 
    537     let now = Timestamp::now();
    538     let balance = &mut Balances::new(&harness).await;
    539 
    540     step("Test incoming transactions");
    541     let reserve_pub = EddsaPublicKey::rand();
    542     harness
    543         .client_send(&format!("Taler {reserve_pub}"), 3)
    544         .await;
    545     harness
    546         .client_send(&format!("Malformed test {now}"), 4)
    547         .await;
    548     balance.expect(3).await;
    549     harness.expect_incoming(reserve_pub).await;
    550 
    551     step("Test outgoing transactions");
    552     let transfer_self = harness
    553         .custom_transfer(1, FullHuPayto::new(harness.exchange.iban.clone(), "Self"))
    554         .await;
    555     let transfer_id = harness
    556         .custom_transfer(2, FullHuPayto::new(harness.client.iban.clone(), "Client"))
    557         .await;
    558     balance.expect(-2).await;
    559     harness
    560         .expect_transfer_status(
    561             transfer_self,
    562             TransferState::permanent_failure,
    563             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    564         )
    565         .await;
    566     harness
    567         .expect_transfer_status(transfer_id, TransferState::success, None)
    568         .await;
    569 
    570     step("Finish");
    571     tokio::time::sleep(Duration::from_secs(5)).await;
    572     balance.expect(0).await;
    573 
    574     Ok(())
    575 }
    576 
    577 fn main() {
    578     let args = Args::parse();
    579     taler_main(CONFIG_SOURCE, args.common, async |cfg| match args.cmd {
    580         Command::Logic { reset } => logic_harness(&cfg, reset).await,
    581         Command::Online { reset } => online_harness(&cfg, reset).await,
    582     });
    583 }