taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

cyclos-harness.rs (19145B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::time::Duration;
     18 
     19 use clap::Parser as _;
     20 use compact_str::CompactString;
     21 use failure_injection::{InjectedErr, set_failure_scenario};
     22 use jiff::Timestamp;
     23 use owo_colors::OwoColorize as _;
     24 use sqlx::{PgPool, Row as _, postgres::PgRow};
     25 use taler_api::notification::dummy_listen;
     26 use taler_build::long_version;
     27 use taler_common::{
     28     CommonArgs,
     29     api::{
     30         EddsaPublicKey, HashCode, ShortHashCode,
     31         params::{History, Page, Pooling},
     32         wire::{IncomingBankTransaction, TransferState},
     33     },
     34     config::Config,
     35     taler_main,
     36     types::{
     37         amount::{Currency, Decimal, decimal},
     38         url,
     39     },
     40 };
     41 use taler_cyclos::{
     42     config::{AccountType, HarnessCfg},
     43     constants::CONFIG_SOURCE,
     44     cyclos_api::{
     45         api::CyclosAuth,
     46         client::Client,
     47         types::{HistoryItem, OrderBy},
     48     },
     49     db::{self, TransferResult, dbinit},
     50     payto::FullCyclosPayto,
     51     setup,
     52     worker::{Worker, WorkerError, WorkerResult, run_worker},
     53 };
     54 
     55 /// Cyclos Adapter harness test suite
     56 #[derive(clap::Parser, Debug)]
     57 #[command(long_version = long_version(), about, long_about = None)]
     58 struct Args {
     59     #[clap(flatten)]
     60     common: CommonArgs,
     61 
     62     #[command(subcommand)]
     63     cmd: Command,
     64 }
     65 
     66 #[derive(clap::Subcommand, Debug)]
     67 enum Command {
     68     /// Run logic tests
     69     Logic {
     70         #[arg(short, long)]
     71         reset: bool,
     72     },
     73     /// Run online tests
     74     Online {
     75         #[arg(short, long)]
     76         reset: bool,
     77     },
     78 }
     79 
     80 fn step(step: &str) {
     81     println!("{}", step.green());
     82 }
     83 
     84 struct Harness<'a> {
     85     pool: &'a PgPool,
     86     client: Client<'a>,
     87     wire: Client<'a>,
     88     client_payto: FullCyclosPayto,
     89     wire_payto: FullCyclosPayto,
     90     payment_type_id: i64,
     91     account_type_id: i64,
     92     currency: Currency,
     93     root: CompactString,
     94 }
     95 
     96 impl<'a> Harness<'a> {
     97     async fn balance(&self) -> (Decimal, Decimal) {
     98         let (exchange, client) =
     99             tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap();
    100         (
    101             exchange[0]
    102                 .status
    103                 .available_balance
    104                 .unwrap_or(exchange[0].status.balance),
    105             client[0]
    106                 .status
    107                 .available_balance
    108                 .unwrap_or(client[0].status.balance),
    109         )
    110     }
    111 
    112     /// Send transaction from client to exchange
    113     async fn client_send(&self, subject: &str, amount: Decimal) -> i64 {
    114         *self
    115             .client
    116             .direct_payment(*self.wire_payto.id, self.payment_type_id, amount, subject)
    117             .await
    118             .unwrap()
    119             .id
    120     }
    121 
    122     /// Send transaction from exchange to client
    123     async fn exchange_send(&self, subject: &str, amount: Decimal) -> i64 {
    124         *self
    125             .wire
    126             .direct_payment(*self.client_payto.id, self.payment_type_id, amount, subject)
    127             .await
    128             .unwrap()
    129             .id
    130     }
    131 
    132     /// Chargeback a transfer
    133     async fn chargeback(&self, id: i64) -> i64 {
    134         self.client.chargeback(id).await.unwrap()
    135     }
    136 
    137     /// Fetch last transfer related to client
    138     async fn client_last_transfer(&self) -> HistoryItem {
    139         self.client
    140             .history(*self.client_payto.id, OrderBy::DateDesc, 0, None)
    141             .await
    142             .unwrap()
    143             .page
    144             .remove(0)
    145     }
    146 
    147     /// Run the worker once
    148     async fn worker(&'a self) -> WorkerResult {
    149         let db = &mut self.pool.acquire().await.unwrap().detach();
    150         Worker {
    151             db,
    152             currency: self.currency,
    153             client: &self.wire,
    154             account_type: AccountType::Exchange,
    155             account_type_id: self.account_type_id,
    156             payment_type_id: self.payment_type_id,
    157         }
    158         .run()
    159         .await
    160     }
    161 
    162     async fn expect_incoming(&self, key: EddsaPublicKey) {
    163         let transfer = db::incoming_history(
    164             self.pool,
    165             &History {
    166                 page: Page {
    167                     limit: -1,
    168                     offset: None,
    169                 },
    170                 pooling: Pooling { timeout_ms: None },
    171             },
    172             &self.currency,
    173             &self.root,
    174             dummy_listen,
    175         )
    176         .await
    177         .unwrap();
    178         assert!(matches!(
    179             transfer.first().unwrap(),
    180             IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key,
    181         ));
    182     }
    183 
    184     async fn custom_transfer(&self, amount: Decimal, creditor_id: i64, creditor_name: &str) -> u64 {
    185         let res = db::make_transfer(
    186             self.pool,
    187             &db::Transfer {
    188                 request_uid: HashCode::rand(),
    189                 amount,
    190                 exchange_base_url: url("https://test.com"),
    191                 metadata: None,
    192                 wtid: ShortHashCode::rand(),
    193                 creditor_id,
    194                 creditor_name: CompactString::new(creditor_name),
    195             },
    196             &Timestamp::now(),
    197         )
    198         .await
    199         .unwrap();
    200         match res {
    201             TransferResult::Success { id, .. } => id,
    202             TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(),
    203         }
    204     }
    205 
    206     async fn transfer(&self, amount: Decimal) -> u64 {
    207         self.custom_transfer(amount, *self.client_payto.id, &self.client_payto.name)
    208             .await
    209     }
    210 
    211     async fn transfer_id(&self, transfer_id: u64) -> i64 {
    212         sqlx::query(
    213             "SELECT transfer_id
    214                 FROM transfer
    215                     JOIN initiated USING (initiated_id)
    216                     JOIN tx_out USING (tx_out_id)
    217                 WHERE initiated_id=$1",
    218         )
    219         .bind(transfer_id as i64)
    220         .try_map(|r: PgRow| r.try_get(0))
    221         .fetch_one(self.pool)
    222         .await
    223         .unwrap()
    224     }
    225 
    226     async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
    227         let mut attempts = 0;
    228         loop {
    229             let transfer = db::transfer_by_id(self.pool, id, &self.currency, &self.root)
    230                 .await
    231                 .unwrap()
    232                 .unwrap();
    233             if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
    234                 return;
    235             }
    236             if attempts > 40 {
    237                 assert_eq!(
    238                     (transfer.status, transfer.status_msg.as_deref()),
    239                     (status, msg)
    240                 );
    241             }
    242             attempts += 1;
    243             tokio::time::sleep(Duration::from_millis(200)).await;
    244         }
    245     }
    246 }
    247 
    248 struct Balances<'a> {
    249     client: &'a Harness<'a>,
    250     exchange_balance: Decimal,
    251     client_balance: Decimal,
    252 }
    253 
    254 impl<'a> Balances<'a> {
    255     pub async fn new(client: &'a Harness<'a>) -> Self {
    256         let (exchange_balance, client_balance) = client.balance().await;
    257         Self {
    258             client,
    259             exchange_balance,
    260             client_balance,
    261         }
    262     }
    263 
    264     async fn expect_add(&mut self, diff: Decimal) {
    265         self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap();
    266         self.client_balance = self.client_balance.try_sub(&diff).unwrap();
    267         let mut attempts = 0;
    268         loop {
    269             let current = self.client.balance().await;
    270             if current == (self.exchange_balance, self.client_balance) {
    271                 return;
    272             }
    273             if attempts > 40 {
    274                 assert_eq!(
    275                     current,
    276                     (self.exchange_balance, self.client_balance),
    277                     "({} {}) +{diff}",
    278                     current.0,
    279                     current.1
    280                 );
    281             }
    282             attempts += 1;
    283             tokio::time::sleep(Duration::from_millis(200)).await;
    284         }
    285     }
    286 
    287     async fn expect_sub(&mut self, diff: Decimal) {
    288         self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap();
    289         self.client_balance = self.client_balance.try_add(&diff).unwrap();
    290 
    291         let mut attempts = 0;
    292         loop {
    293             let current = self.client.balance().await;
    294             if current == (self.exchange_balance, self.client_balance) {
    295                 return;
    296             }
    297             if attempts > 40 {
    298                 assert_eq!(
    299                     current,
    300                     (self.exchange_balance, self.client_balance),
    301                     "({} {}) -{diff}",
    302                     current.0,
    303                     current.1
    304                 );
    305             }
    306             attempts += 1;
    307             tokio::time::sleep(Duration::from_millis(200)).await;
    308         }
    309     }
    310 }
    311 
    312 /// Run logic tests against local Cyclos backend
    313 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
    314     step("Run Cyclos logic harness tests");
    315 
    316     step("Prepare db");
    317     let pool = dbinit(cfg, reset).await?;
    318 
    319     let client = http_client::client()?;
    320     setup::setup(cfg, reset, &client).await?;
    321     let cfg = HarnessCfg::parse(cfg)?;
    322     let wire = Client {
    323         client: &client,
    324         api_url: &cfg.worker.host.api_url,
    325         auth: &CyclosAuth::Basic {
    326             username: cfg.worker.host.username,
    327             password: cfg.worker.host.password,
    328         },
    329     };
    330     let client = Client {
    331         client: &client,
    332         api_url: &cfg.worker.host.api_url,
    333         auth: &CyclosAuth::Basic {
    334             username: cfg.username,
    335             password: cfg.password,
    336         },
    337     };
    338     let harness = Harness {
    339         pool: &pool,
    340         client_payto: client
    341             .whoami()
    342             .await
    343             .unwrap()
    344             .payto(cfg.worker.root.clone()),
    345         wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()),
    346         client,
    347         wire,
    348         currency: cfg.worker.currency,
    349         root: cfg.worker.root,
    350         payment_type_id: *cfg.worker.payment_type_id,
    351         account_type_id: *cfg.worker.account_type_id,
    352     };
    353 
    354     step("Warmup");
    355     harness.worker().await.unwrap();
    356     let now = Timestamp::now();
    357     let balance = &mut Balances::new(&harness).await;
    358 
    359     step("Test incoming talerable transaction");
    360     // Send talerable transaction
    361     let reserve_pub = EddsaPublicKey::rand();
    362     let amount = decimal("3.3");
    363     harness
    364         .client_send(&format!("Taler {reserve_pub}"), amount)
    365         .await;
    366     // Sync and register
    367     harness.worker().await?;
    368     harness.expect_incoming(reserve_pub).await;
    369     balance.expect_add(amount).await;
    370 
    371     step("Test incoming malformed transaction");
    372     // Send malformed transaction
    373     let amount = decimal("3.4");
    374     harness
    375         .client_send(&format!("Malformed test {now}"), amount)
    376         .await;
    377     balance.expect_add(amount).await;
    378     // Sync and bounce
    379     harness.worker().await?;
    380     balance.expect_sub(amount).await;
    381 
    382     step("Test transfer transactions");
    383     let amount = decimal("3.5");
    384     // Init a transfer to client
    385     let transfer_id = harness.transfer(amount).await;
    386     // Check transfer pending
    387     harness
    388         .expect_transfer_status(transfer_id, TransferState::pending, None)
    389         .await;
    390     // Should send
    391     harness.worker().await?;
    392     // Wait for transaction to finalize
    393     balance.expect_sub(amount).await;
    394     // Should register
    395     harness.worker().await?;
    396     // Check transfer is now successful
    397     harness
    398         .expect_transfer_status(transfer_id, TransferState::success, None)
    399         .await;
    400 
    401     step("Test transfer to self");
    402     // Init a transfer to self
    403     let transfer_id = harness
    404         .custom_transfer(
    405             decimal("10.1"),
    406             *harness.wire_payto.id,
    407             &harness.wire_payto.name,
    408         )
    409         .await;
    410     // Should failed
    411     harness.worker().await?;
    412     // Check transfer failed
    413     harness
    414         .expect_transfer_status(
    415             transfer_id,
    416             TransferState::permanent_failure,
    417             Some("permissionDenied - The operation was denied because a required permission was not granted"),
    418         )
    419         .await;
    420 
    421     step("Test transfer to unknown account");
    422     // Init a transfer to unknown
    423     let transfer_id = harness
    424         .custom_transfer(decimal("10.1"), 42, "Unknown")
    425         .await;
    426     // Should failed
    427     harness.worker().await?;
    428     // Check transfer failed
    429     harness
    430         .expect_transfer_status(
    431             transfer_id,
    432             TransferState::permanent_failure,
    433             Some("unknown BasicUser 42"),
    434         )
    435         .await;
    436 
    437     step("Test unexpected outgoing");
    438     // Manual tx from the exchange
    439     let amount = decimal("4");
    440     harness
    441         .exchange_send(&format!("What is this ? {now}"), amount)
    442         .await;
    443     harness.worker().await?;
    444     // Wait for transaction to finalize
    445     balance.expect_sub(amount).await;
    446     harness.worker().await?;
    447 
    448     step("Test transfer chargeback");
    449     let amount = decimal("10.1");
    450     // Init a transfer to client
    451     let transfer_id = harness.transfer(amount).await;
    452     harness
    453         .expect_transfer_status(transfer_id, TransferState::pending, None)
    454         .await;
    455     // Send
    456     harness.worker().await?;
    457     balance.expect_sub(amount).await;
    458     harness
    459         .expect_transfer_status(transfer_id, TransferState::pending, None)
    460         .await;
    461     // Sync
    462     harness.worker().await?;
    463     harness
    464         .expect_transfer_status(transfer_id, TransferState::success, None)
    465         .await;
    466     // Chargeback
    467     harness
    468         .chargeback(harness.transfer_id(transfer_id).await)
    469         .await;
    470     balance.expect_add(amount).await;
    471     harness.worker().await?;
    472     harness
    473         .expect_transfer_status(
    474             transfer_id,
    475             TransferState::late_failure,
    476             Some("charged back"),
    477         )
    478         .await;
    479 
    480     step("Test recover unexpected chargeback");
    481     let amount = decimal("10.2");
    482     // Manual tx from the exchange
    483     harness
    484         .exchange_send(&format!("What is this chargebacked ? {now}"), amount)
    485         .await;
    486     balance.expect_sub(amount).await;
    487     // Chargeback
    488     harness
    489         .chargeback(*harness.client_last_transfer().await.id)
    490         .await;
    491     balance.expect_add(amount).await;
    492     // Sync
    493     harness.worker().await?;
    494 
    495     step("Test direct-payment failure");
    496     let amount = decimal("10.3");
    497     harness.transfer(amount).await;
    498     set_failure_scenario(&["direct-payment"]);
    499     assert!(matches!(
    500         harness.worker().await.unwrap_err(),
    501         WorkerError::Injected(InjectedErr("direct-payment"))
    502     ));
    503     harness.worker().await?;
    504     balance.expect_sub(amount).await;
    505     harness.worker().await?;
    506 
    507     step("Test chargeback failure");
    508     // Send malformed transaction
    509     let amount = decimal("10.4");
    510     harness
    511         .client_send(&format!("Malformed test {now} with failure"), amount)
    512         .await;
    513     balance.expect_add(amount).await;
    514     // Sync and bounce
    515     set_failure_scenario(&["chargeback"]);
    516     assert!(matches!(
    517         harness.worker().await.unwrap_err(),
    518         WorkerError::Injected(InjectedErr("chargeback"))
    519     ));
    520     balance.expect_sub(amount).await;
    521     // Sync recover
    522     harness.worker().await?;
    523 
    524     step("Finish");
    525     harness.worker().await?;
    526     balance.expect_add(Decimal::ZERO).await;
    527     Ok(())
    528 }
    529 
    530 /// Run online tests against real Cyclos backend
    531 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
    532     step("Run Cyclos online harness tests");
    533 
    534     step("Prepare db");
    535     let pool = dbinit(config, reset).await?;
    536     let http_client = http_client::client()?;
    537     setup::setup(config, reset, &http_client).await?;
    538     let cfg = HarnessCfg::parse(config)?;
    539     let wire = Client {
    540         client: &http_client,
    541         api_url: &cfg.worker.host.api_url,
    542         auth: &CyclosAuth::Basic {
    543             username: cfg.worker.host.username,
    544             password: cfg.worker.host.password,
    545         },
    546     };
    547     let client = Client {
    548         client: &http_client,
    549         api_url: &cfg.worker.host.api_url,
    550         auth: &CyclosAuth::Basic {
    551             username: cfg.username,
    552             password: cfg.password,
    553         },
    554     };
    555 
    556     let harness = Harness {
    557         pool: &pool,
    558         client_payto: client
    559             .whoami()
    560             .await
    561             .unwrap()
    562             .payto(cfg.worker.root.clone()),
    563         wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()),
    564         client,
    565         wire,
    566         currency: cfg.worker.currency,
    567         root: cfg.worker.root,
    568         payment_type_id: *cfg.worker.payment_type_id,
    569         account_type_id: *cfg.worker.account_type_id,
    570     };
    571 
    572     step("Warmup worker");
    573     let _worker_task = {
    574         let client = http_client.clone();
    575         let pool = pool.clone();
    576         let config = config.clone();
    577         tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
    578     };
    579     tokio::time::sleep(Duration::from_secs(5)).await;
    580     let now = Timestamp::now();
    581     let balance = &mut Balances::new(&harness).await;
    582 
    583     step("Test incoming transactions");
    584     let taler_amount = decimal("3");
    585     let malformed_amount = decimal("4");
    586     let reserve_pub = EddsaPublicKey::rand();
    587     harness
    588         .client_send(&format!("Taler {reserve_pub}"), taler_amount)
    589         .await;
    590     harness
    591         .client_send(&format!("Malformed test {now}"), malformed_amount)
    592         .await;
    593     balance.expect_add(taler_amount).await;
    594     harness.expect_incoming(reserve_pub).await;
    595 
    596     step("Test outgoing transactions");
    597     let self_amount = decimal("1");
    598     let taler_amount = decimal("2");
    599 
    600     let transfer_self = harness
    601         .custom_transfer(
    602             self_amount,
    603             *harness.wire_payto.id,
    604             &harness.wire_payto.name,
    605         )
    606         .await;
    607     let transfer_id = harness.transfer(taler_amount).await;
    608     balance.expect_sub(taler_amount).await;
    609     harness
    610         .expect_transfer_status(
    611             transfer_self,
    612             TransferState::permanent_failure,
    613             Some("permissionDenied - The operation was denied because a required permission was not granted"),
    614         )
    615         .await;
    616     harness
    617         .expect_transfer_status(transfer_id, TransferState::success, None)
    618         .await;
    619 
    620     step("Finish");
    621     tokio::time::sleep(Duration::from_secs(5)).await;
    622     balance.expect_add(Decimal::ZERO).await;
    623 
    624     Ok(())
    625 }
    626 
    627 fn main() {
    628     let args = Args::parse();
    629     taler_main(CONFIG_SOURCE, args.common, async |cfg| match args.cmd {
    630         Command::Logic { reset } => logic_harness(&cfg, reset).await,
    631         Command::Online { reset } => online_harness(&cfg, reset).await,
    632     });
    633 }