taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

cyclos-harness.rs (19123B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::time::Duration;
     18 
     19 use clap::Parser as _;
     20 use compact_str::CompactString;
     21 use failure_injection::{InjectedErr, set_failure_scenario};
     22 use jiff::Timestamp;
     23 use owo_colors::OwoColorize as _;
     24 use sqlx::{PgPool, Row as _, postgres::PgRow};
     25 use taler_api::notification::dummy_listen;
     26 use taler_build::long_version;
     27 use taler_common::{
     28     CommonArgs,
     29     api_common::{EddsaPublicKey, HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_wire::{IncomingBankTransaction, TransferState},
     32     config::Config,
     33     taler_main,
     34     types::{
     35         amount::{Currency, Decimal, decimal},
     36         url,
     37     },
     38 };
     39 use taler_cyclos::{
     40     config::{AccountType, HarnessCfg},
     41     constants::CONFIG_SOURCE,
     42     cyclos_api::{
     43         api::CyclosAuth,
     44         client::Client,
     45         types::{HistoryItem, OrderBy},
     46     },
     47     db::{self, TransferResult, dbinit},
     48     payto::FullCyclosPayto,
     49     setup,
     50     worker::{Worker, WorkerError, WorkerResult, run_worker},
     51 };
     52 
     53 /// Cyclos Adapter harness test suite
     54 #[derive(clap::Parser, Debug)]
     55 #[command(long_version = long_version(), about, long_about = None)]
     56 struct Args {
     57     #[clap(flatten)]
     58     common: CommonArgs,
     59 
     60     #[command(subcommand)]
     61     cmd: Command,
     62 }
     63 
     64 #[derive(clap::Subcommand, Debug)]
     65 enum Command {
     66     /// Run logic tests
     67     Logic {
     68         #[clap(long, short)]
     69         reset: bool,
     70     },
     71     /// Run online tests
     72     Online {
     73         #[clap(long, short)]
     74         reset: bool,
     75     },
     76 }
     77 
     78 fn step(step: &str) {
     79     println!("{}", step.green());
     80 }
     81 
     82 struct Harness<'a> {
     83     pool: &'a PgPool,
     84     client: Client<'a>,
     85     wire: Client<'a>,
     86     client_payto: FullCyclosPayto,
     87     wire_payto: FullCyclosPayto,
     88     payment_type_id: i64,
     89     account_type_id: i64,
     90     currency: Currency,
     91     root: CompactString,
     92 }
     93 
     94 impl<'a> Harness<'a> {
     95     async fn balance(&self) -> (Decimal, Decimal) {
     96         let (exchange, client) =
     97             tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap();
     98         (
     99             exchange[0]
    100                 .status
    101                 .available_balance
    102                 .unwrap_or(exchange[0].status.balance),
    103             client[0]
    104                 .status
    105                 .available_balance
    106                 .unwrap_or(client[0].status.balance),
    107         )
    108     }
    109 
    110     /// Send transaction from client to exchange
    111     async fn client_send(&self, subject: &str, amount: Decimal) -> i64 {
    112         *self
    113             .client
    114             .direct_payment(*self.wire_payto.id, self.payment_type_id, amount, subject)
    115             .await
    116             .unwrap()
    117             .id
    118     }
    119 
    120     /// Send transaction from exchange to client
    121     async fn exchange_send(&self, subject: &str, amount: Decimal) -> i64 {
    122         *self
    123             .wire
    124             .direct_payment(*self.client_payto.id, self.payment_type_id, amount, subject)
    125             .await
    126             .unwrap()
    127             .id
    128     }
    129 
    130     /// Chargeback a transfer
    131     async fn chargeback(&self, id: i64) -> i64 {
    132         self.client.chargeback(id).await.unwrap()
    133     }
    134 
    135     /// Fetch last transfer related to client
    136     async fn client_last_transfer(&self) -> HistoryItem {
    137         self.client
    138             .history(*self.client_payto.id, OrderBy::DateDesc, 0, None)
    139             .await
    140             .unwrap()
    141             .page
    142             .remove(0)
    143     }
    144 
    145     /// Run the worker once
    146     async fn worker(&'a self) -> WorkerResult {
    147         let db = &mut self.pool.acquire().await.unwrap().detach();
    148         Worker {
    149             db,
    150             currency: self.currency.clone(),
    151             client: &self.wire,
    152             account_type: AccountType::Exchange,
    153             account_type_id: self.account_type_id,
    154             payment_type_id: self.payment_type_id,
    155         }
    156         .run()
    157         .await
    158     }
    159 
    160     async fn expect_incoming(&self, key: EddsaPublicKey) {
    161         let transfer = db::incoming_history(
    162             self.pool,
    163             &History {
    164                 page: Page {
    165                     limit: -1,
    166                     offset: None,
    167                 },
    168                 timeout_ms: None,
    169             },
    170             &self.currency,
    171             &self.root,
    172             dummy_listen,
    173         )
    174         .await
    175         .unwrap();
    176         assert!(matches!(
    177             transfer.first().unwrap(),
    178             IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key,
    179         ));
    180     }
    181 
    182     async fn custom_transfer(&self, amount: Decimal, creditor_id: i64, creditor_name: &str) -> u64 {
    183         let res = db::make_transfer(
    184             self.pool,
    185             &db::Transfer {
    186                 request_uid: HashCode::rand(),
    187                 amount,
    188                 exchange_base_url: url("https://test.com"),
    189                 metadata: None,
    190                 wtid: ShortHashCode::rand(),
    191                 creditor_id,
    192                 creditor_name: CompactString::new(creditor_name),
    193             },
    194             &Timestamp::now(),
    195         )
    196         .await
    197         .unwrap();
    198         match res {
    199             TransferResult::Success { id, .. } => id,
    200             TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(),
    201         }
    202     }
    203 
    204     async fn transfer(&self, amount: Decimal) -> u64 {
    205         self.custom_transfer(amount, *self.client_payto.id, &self.client_payto.name)
    206             .await
    207     }
    208 
    209     async fn transfer_id(&self, transfer_id: u64) -> i64 {
    210         sqlx::query(
    211             "SELECT transfer_id 
    212                 FROM transfer 
    213                     JOIN initiated USING (initiated_id)
    214                     JOIN tx_out USING (tx_out_id)
    215                 WHERE initiated_id=$1",
    216         )
    217         .bind(transfer_id as i64)
    218         .try_map(|r: PgRow| r.try_get(0))
    219         .fetch_one(self.pool)
    220         .await
    221         .unwrap()
    222     }
    223 
    224     async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
    225         let mut attempts = 0;
    226         loop {
    227             let transfer = db::transfer_by_id(self.pool, id, &self.currency, &self.root)
    228                 .await
    229                 .unwrap()
    230                 .unwrap();
    231             if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
    232                 return;
    233             }
    234             if attempts > 40 {
    235                 assert_eq!(
    236                     (transfer.status, transfer.status_msg.as_deref()),
    237                     (status, msg)
    238                 );
    239             }
    240             attempts += 1;
    241             tokio::time::sleep(Duration::from_millis(200)).await;
    242         }
    243     }
    244 }
    245 
    246 struct Balances<'a> {
    247     client: &'a Harness<'a>,
    248     exchange_balance: Decimal,
    249     client_balance: Decimal,
    250 }
    251 
    252 impl<'a> Balances<'a> {
    253     pub async fn new(client: &'a Harness<'a>) -> Self {
    254         let (exchange_balance, client_balance) = client.balance().await;
    255         Self {
    256             client,
    257             exchange_balance,
    258             client_balance,
    259         }
    260     }
    261 
    262     async fn expect_add(&mut self, diff: Decimal) {
    263         self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap();
    264         self.client_balance = self.client_balance.try_sub(&diff).unwrap();
    265         let mut attempts = 0;
    266         loop {
    267             let current = self.client.balance().await;
    268             if current == (self.exchange_balance, self.client_balance) {
    269                 return;
    270             }
    271             if attempts > 40 {
    272                 assert_eq!(
    273                     current,
    274                     (self.exchange_balance, self.client_balance),
    275                     "({} {}) +{diff}",
    276                     current.0,
    277                     current.1
    278                 );
    279             }
    280             attempts += 1;
    281             tokio::time::sleep(Duration::from_millis(200)).await;
    282         }
    283     }
    284 
    285     async fn expect_sub(&mut self, diff: Decimal) {
    286         self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap();
    287         self.client_balance = self.client_balance.try_add(&diff).unwrap();
    288 
    289         let mut attempts = 0;
    290         loop {
    291             let current = self.client.balance().await;
    292             if current == (self.exchange_balance, self.client_balance) {
    293                 return;
    294             }
    295             if attempts > 40 {
    296                 assert_eq!(
    297                     current,
    298                     (self.exchange_balance, self.client_balance),
    299                     "({} {}) -{diff}",
    300                     current.0,
    301                     current.1
    302                 );
    303             }
    304             attempts += 1;
    305             tokio::time::sleep(Duration::from_millis(200)).await;
    306         }
    307     }
    308 }
    309 
    310 /// Run logic tests against local Cyclos backend
    311 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
    312     step("Run Cyclos logic harness tests");
    313 
    314     step("Prepare db");
    315     let pool = dbinit(cfg, reset).await?;
    316 
    317     let client = http_client::client()?;
    318     setup::setup(cfg, reset, &client).await?;
    319     let cfg = HarnessCfg::parse(cfg)?;
    320     let wire = Client {
    321         client: &client,
    322         api_url: &cfg.worker.host.api_url,
    323         auth: &CyclosAuth::Basic {
    324             username: cfg.worker.host.username,
    325             password: cfg.worker.host.password,
    326         },
    327     };
    328     let client = Client {
    329         client: &client,
    330         api_url: &cfg.worker.host.api_url,
    331         auth: &CyclosAuth::Basic {
    332             username: cfg.username,
    333             password: cfg.password,
    334         },
    335     };
    336     let harness = Harness {
    337         pool: &pool,
    338         client_payto: client
    339             .whoami()
    340             .await
    341             .unwrap()
    342             .payto(cfg.worker.root.clone()),
    343         wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()),
    344         client,
    345         wire,
    346         currency: cfg.worker.currency,
    347         root: cfg.worker.root,
    348         payment_type_id: *cfg.worker.payment_type_id,
    349         account_type_id: *cfg.worker.account_type_id,
    350     };
    351 
    352     step("Warmup");
    353     harness.worker().await.unwrap();
    354     let now = Timestamp::now();
    355     let balance = &mut Balances::new(&harness).await;
    356 
    357     step("Test incoming talerable transaction");
    358     // Send talerable transaction
    359     let reserve_pub = EddsaPublicKey::rand();
    360     let amount = decimal("3.3");
    361     harness
    362         .client_send(&format!("Taler {reserve_pub}"), amount)
    363         .await;
    364     // Sync and register
    365     harness.worker().await?;
    366     harness.expect_incoming(reserve_pub).await;
    367     balance.expect_add(amount).await;
    368 
    369     step("Test incoming malformed transaction");
    370     // Send malformed transaction
    371     let amount = decimal("3.4");
    372     harness
    373         .client_send(&format!("Malformed test {now}"), amount)
    374         .await;
    375     balance.expect_add(amount).await;
    376     // Sync and bounce
    377     harness.worker().await?;
    378     balance.expect_sub(amount).await;
    379 
    380     step("Test transfer transactions");
    381     let amount = decimal("3.5");
    382     // Init a transfer to client
    383     let transfer_id = harness.transfer(amount).await;
    384     // Check transfer pending
    385     harness
    386         .expect_transfer_status(transfer_id, TransferState::pending, None)
    387         .await;
    388     // Should send
    389     harness.worker().await?;
    390     // Wait for transaction to finalize
    391     balance.expect_sub(amount).await;
    392     // Should register
    393     harness.worker().await?;
    394     // Check transfer is now successful
    395     harness
    396         .expect_transfer_status(transfer_id, TransferState::success, None)
    397         .await;
    398 
    399     step("Test transfer to self");
    400     // Init a transfer to self
    401     let transfer_id = harness
    402         .custom_transfer(
    403             decimal("10.1"),
    404             *harness.wire_payto.id,
    405             &harness.wire_payto.name,
    406         )
    407         .await;
    408     // Should failed
    409     harness.worker().await?;
    410     // Check transfer failed
    411     harness
    412         .expect_transfer_status(
    413             transfer_id,
    414             TransferState::permanent_failure,
    415             Some("permissionDenied - The operation was denied because a required permission was not granted"),
    416         )
    417         .await;
    418 
    419     step("Test transfer to unknown account");
    420     // Init a transfer to unknown
    421     let transfer_id = harness
    422         .custom_transfer(decimal("10.1"), 42, "Unknown")
    423         .await;
    424     // Should failed
    425     harness.worker().await?;
    426     // Check transfer failed
    427     harness
    428         .expect_transfer_status(
    429             transfer_id,
    430             TransferState::permanent_failure,
    431             Some("unknown BasicUser 42"),
    432         )
    433         .await;
    434 
    435     step("Test unexpected outgoing");
    436     // Manual tx from the exchange
    437     let amount = decimal("4");
    438     harness
    439         .exchange_send(&format!("What is this ? {now}"), amount)
    440         .await;
    441     harness.worker().await?;
    442     // Wait for transaction to finalize
    443     balance.expect_sub(amount).await;
    444     harness.worker().await?;
    445 
    446     step("Test transfer chargeback");
    447     let amount = decimal("10.1");
    448     // Init a transfer to client
    449     let transfer_id = harness.transfer(amount).await;
    450     harness
    451         .expect_transfer_status(transfer_id, TransferState::pending, None)
    452         .await;
    453     // Send
    454     harness.worker().await?;
    455     balance.expect_sub(amount).await;
    456     harness
    457         .expect_transfer_status(transfer_id, TransferState::pending, None)
    458         .await;
    459     // Sync
    460     harness.worker().await?;
    461     harness
    462         .expect_transfer_status(transfer_id, TransferState::success, None)
    463         .await;
    464     // Chargeback
    465     harness
    466         .chargeback(harness.transfer_id(transfer_id).await)
    467         .await;
    468     balance.expect_add(amount).await;
    469     harness.worker().await?;
    470     harness
    471         .expect_transfer_status(
    472             transfer_id,
    473             TransferState::late_failure,
    474             Some("charged back"),
    475         )
    476         .await;
    477 
    478     step("Test recover unexpected chargeback");
    479     let amount = decimal("10.2");
    480     // Manual tx from the exchange
    481     harness
    482         .exchange_send(&format!("What is this chargebacked ? {now}"), amount)
    483         .await;
    484     balance.expect_sub(amount).await;
    485     // Chargeback
    486     harness
    487         .chargeback(*harness.client_last_transfer().await.id)
    488         .await;
    489     balance.expect_add(amount).await;
    490     // Sync
    491     harness.worker().await?;
    492 
    493     step("Test direct-payment failure");
    494     let amount = decimal("10.3");
    495     harness.transfer(amount).await;
    496     set_failure_scenario(&["direct-payment"]);
    497     assert!(matches!(
    498         harness.worker().await.unwrap_err(),
    499         WorkerError::Injected(InjectedErr("direct-payment"))
    500     ));
    501     harness.worker().await?;
    502     balance.expect_sub(amount).await;
    503     harness.worker().await?;
    504 
    505     step("Test chargeback failure");
    506     // Send malformed transaction
    507     let amount = decimal("10.4");
    508     harness
    509         .client_send(&format!("Malformed test {now} with failure"), amount)
    510         .await;
    511     balance.expect_add(amount).await;
    512     // Sync and bounce
    513     set_failure_scenario(&["chargeback"]);
    514     assert!(matches!(
    515         harness.worker().await.unwrap_err(),
    516         WorkerError::Injected(InjectedErr("chargeback"))
    517     ));
    518     balance.expect_sub(amount).await;
    519     // Sync recover
    520     harness.worker().await?;
    521 
    522     step("Finish");
    523     harness.worker().await?;
    524     balance.expect_add(Decimal::zero()).await;
    525     Ok(())
    526 }
    527 
    528 /// Run online tests against real Cyclos backend
    529 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
    530     step("Run Cyclos online harness tests");
    531 
    532     step("Prepare db");
    533     let pool = dbinit(config, reset).await?;
    534     let http_client = http_client::client()?;
    535     setup::setup(config, reset, &http_client).await?;
    536     let cfg = HarnessCfg::parse(config)?;
    537     let wire = Client {
    538         client: &http_client,
    539         api_url: &cfg.worker.host.api_url,
    540         auth: &CyclosAuth::Basic {
    541             username: cfg.worker.host.username,
    542             password: cfg.worker.host.password,
    543         },
    544     };
    545     let client = Client {
    546         client: &http_client,
    547         api_url: &cfg.worker.host.api_url,
    548         auth: &CyclosAuth::Basic {
    549             username: cfg.username,
    550             password: cfg.password,
    551         },
    552     };
    553 
    554     let harness = Harness {
    555         pool: &pool,
    556         client_payto: client
    557             .whoami()
    558             .await
    559             .unwrap()
    560             .payto(cfg.worker.root.clone()),
    561         wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()),
    562         client,
    563         wire,
    564         currency: cfg.worker.currency,
    565         root: cfg.worker.root,
    566         payment_type_id: *cfg.worker.payment_type_id,
    567         account_type_id: *cfg.worker.account_type_id,
    568     };
    569 
    570     step("Warmup worker");
    571     let _worker_task = {
    572         let client = http_client.clone();
    573         let pool = pool.clone();
    574         let config = config.clone();
    575         tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
    576     };
    577     tokio::time::sleep(Duration::from_secs(5)).await;
    578     let now = Timestamp::now();
    579     let balance = &mut Balances::new(&harness).await;
    580 
    581     step("Test incoming transactions");
    582     let taler_amount = decimal("3");
    583     let malformed_amount = decimal("4");
    584     let reserve_pub = EddsaPublicKey::rand();
    585     harness
    586         .client_send(&format!("Taler {reserve_pub}"), taler_amount)
    587         .await;
    588     harness
    589         .client_send(&format!("Malformed test {now}"), malformed_amount)
    590         .await;
    591     balance.expect_add(taler_amount).await;
    592     harness.expect_incoming(reserve_pub).await;
    593 
    594     step("Test outgoing transactions");
    595     let self_amount = decimal("1");
    596     let taler_amount = decimal("2");
    597 
    598     let transfer_self = harness
    599         .custom_transfer(
    600             self_amount,
    601             *harness.wire_payto.id,
    602             &harness.wire_payto.name,
    603         )
    604         .await;
    605     let transfer_id = harness.transfer(taler_amount).await;
    606     balance.expect_sub(taler_amount).await;
    607     harness
    608         .expect_transfer_status(
    609             transfer_self,
    610             TransferState::permanent_failure,
    611             Some("permissionDenied - The operation was denied because a required permission was not granted"),
    612         )
    613         .await;
    614     harness
    615         .expect_transfer_status(transfer_id, TransferState::success, None)
    616         .await;
    617 
    618     step("Finish");
    619     tokio::time::sleep(Duration::from_secs(5)).await;
    620     balance.expect_add(Decimal::zero()).await;
    621 
    622     Ok(())
    623 }
    624 
    625 fn main() {
    626     let args = Args::parse();
    627     taler_main(CONFIG_SOURCE, args.common, async |cfg| match args.cmd {
    628         Command::Logic { reset } => logic_harness(&cfg, reset).await,
    629         Command::Online { reset } => online_harness(&cfg, reset).await,
    630     });
    631 }