cyclos-harness.rs (19145B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::time::Duration; 18 19 use clap::Parser as _; 20 use compact_str::CompactString; 21 use failure_injection::{InjectedErr, set_failure_scenario}; 22 use jiff::Timestamp; 23 use owo_colors::OwoColorize as _; 24 use sqlx::{PgPool, Row as _, postgres::PgRow}; 25 use taler_api::notification::dummy_listen; 26 use taler_build::long_version; 27 use taler_common::{ 28 CommonArgs, 29 api::{ 30 EddsaPublicKey, HashCode, ShortHashCode, 31 params::{History, Page, Pooling}, 32 wire::{IncomingBankTransaction, TransferState}, 33 }, 34 config::Config, 35 taler_main, 36 types::{ 37 amount::{Currency, Decimal, decimal}, 38 url, 39 }, 40 }; 41 use taler_cyclos::{ 42 config::{AccountType, HarnessCfg}, 43 constants::CONFIG_SOURCE, 44 cyclos_api::{ 45 api::CyclosAuth, 46 client::Client, 47 types::{HistoryItem, OrderBy}, 48 }, 49 db::{self, TransferResult, dbinit}, 50 payto::FullCyclosPayto, 51 setup, 52 worker::{Worker, WorkerError, WorkerResult, run_worker}, 53 }; 54 55 /// Cyclos Adapter harness test suite 56 #[derive(clap::Parser, Debug)] 57 #[command(long_version = long_version(), about, long_about = None)] 58 struct Args { 59 #[clap(flatten)] 60 common: CommonArgs, 61 62 #[command(subcommand)] 63 cmd: Command, 64 } 65 66 #[derive(clap::Subcommand, Debug)] 67 enum Command { 68 /// Run logic tests 69 Logic { 70 #[arg(short, long)] 71 reset: bool, 72 }, 73 /// Run online tests 74 Online { 75 #[arg(short, long)] 76 reset: bool, 77 }, 78 } 79 80 fn step(step: &str) { 81 println!("{}", step.green()); 82 } 83 84 struct Harness<'a> { 85 pool: &'a PgPool, 86 client: Client<'a>, 87 wire: Client<'a>, 88 client_payto: FullCyclosPayto, 89 wire_payto: FullCyclosPayto, 90 payment_type_id: i64, 91 account_type_id: i64, 92 currency: Currency, 93 root: CompactString, 94 } 95 96 impl<'a> Harness<'a> { 97 async fn balance(&self) -> (Decimal, Decimal) { 98 let (exchange, client) = 99 tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap(); 100 ( 101 exchange[0] 102 .status 103 .available_balance 104 .unwrap_or(exchange[0].status.balance), 105 client[0] 106 .status 107 .available_balance 108 .unwrap_or(client[0].status.balance), 109 ) 110 } 111 112 /// Send transaction from client to exchange 113 async fn client_send(&self, subject: &str, amount: Decimal) -> i64 { 114 *self 115 .client 116 .direct_payment(*self.wire_payto.id, self.payment_type_id, amount, subject) 117 .await 118 .unwrap() 119 .id 120 } 121 122 /// Send transaction from exchange to client 123 async fn exchange_send(&self, subject: &str, amount: Decimal) -> i64 { 124 *self 125 .wire 126 .direct_payment(*self.client_payto.id, self.payment_type_id, amount, subject) 127 .await 128 .unwrap() 129 .id 130 } 131 132 /// Chargeback a transfer 133 async fn chargeback(&self, id: i64) -> i64 { 134 self.client.chargeback(id).await.unwrap() 135 } 136 137 /// Fetch last transfer related to client 138 async fn client_last_transfer(&self) -> HistoryItem { 139 self.client 140 .history(*self.client_payto.id, OrderBy::DateDesc, 0, None) 141 .await 142 .unwrap() 143 .page 144 .remove(0) 145 } 146 147 /// Run the worker once 148 async fn worker(&'a self) -> WorkerResult { 149 let db = &mut self.pool.acquire().await.unwrap().detach(); 150 Worker { 151 db, 152 currency: self.currency, 153 client: &self.wire, 154 account_type: AccountType::Exchange, 155 account_type_id: self.account_type_id, 156 payment_type_id: self.payment_type_id, 157 } 158 .run() 159 .await 160 } 161 162 async fn expect_incoming(&self, key: EddsaPublicKey) { 163 let transfer = db::incoming_history( 164 self.pool, 165 &History { 166 page: Page { 167 limit: -1, 168 offset: None, 169 }, 170 pooling: Pooling { timeout_ms: None }, 171 }, 172 &self.currency, 173 &self.root, 174 dummy_listen, 175 ) 176 .await 177 .unwrap(); 178 assert!(matches!( 179 transfer.first().unwrap(), 180 IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key, 181 )); 182 } 183 184 async fn custom_transfer(&self, amount: Decimal, creditor_id: i64, creditor_name: &str) -> u64 { 185 let res = db::make_transfer( 186 self.pool, 187 &db::Transfer { 188 request_uid: HashCode::rand(), 189 amount, 190 exchange_base_url: url("https://test.com"), 191 metadata: None, 192 wtid: ShortHashCode::rand(), 193 creditor_id, 194 creditor_name: CompactString::new(creditor_name), 195 }, 196 &Timestamp::now(), 197 ) 198 .await 199 .unwrap(); 200 match res { 201 TransferResult::Success { id, .. } => id, 202 TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(), 203 } 204 } 205 206 async fn transfer(&self, amount: Decimal) -> u64 { 207 self.custom_transfer(amount, *self.client_payto.id, &self.client_payto.name) 208 .await 209 } 210 211 async fn transfer_id(&self, transfer_id: u64) -> i64 { 212 sqlx::query( 213 "SELECT transfer_id 214 FROM transfer 215 JOIN initiated USING (initiated_id) 216 JOIN tx_out USING (tx_out_id) 217 WHERE initiated_id=$1", 218 ) 219 .bind(transfer_id as i64) 220 .try_map(|r: PgRow| r.try_get(0)) 221 .fetch_one(self.pool) 222 .await 223 .unwrap() 224 } 225 226 async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) { 227 let mut attempts = 0; 228 loop { 229 let transfer = db::transfer_by_id(self.pool, id, &self.currency, &self.root) 230 .await 231 .unwrap() 232 .unwrap(); 233 if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) { 234 return; 235 } 236 if attempts > 40 { 237 assert_eq!( 238 (transfer.status, transfer.status_msg.as_deref()), 239 (status, msg) 240 ); 241 } 242 attempts += 1; 243 tokio::time::sleep(Duration::from_millis(200)).await; 244 } 245 } 246 } 247 248 struct Balances<'a> { 249 client: &'a Harness<'a>, 250 exchange_balance: Decimal, 251 client_balance: Decimal, 252 } 253 254 impl<'a> Balances<'a> { 255 pub async fn new(client: &'a Harness<'a>) -> Self { 256 let (exchange_balance, client_balance) = client.balance().await; 257 Self { 258 client, 259 exchange_balance, 260 client_balance, 261 } 262 } 263 264 async fn expect_add(&mut self, diff: Decimal) { 265 self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap(); 266 self.client_balance = self.client_balance.try_sub(&diff).unwrap(); 267 let mut attempts = 0; 268 loop { 269 let current = self.client.balance().await; 270 if current == (self.exchange_balance, self.client_balance) { 271 return; 272 } 273 if attempts > 40 { 274 assert_eq!( 275 current, 276 (self.exchange_balance, self.client_balance), 277 "({} {}) +{diff}", 278 current.0, 279 current.1 280 ); 281 } 282 attempts += 1; 283 tokio::time::sleep(Duration::from_millis(200)).await; 284 } 285 } 286 287 async fn expect_sub(&mut self, diff: Decimal) { 288 self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap(); 289 self.client_balance = self.client_balance.try_add(&diff).unwrap(); 290 291 let mut attempts = 0; 292 loop { 293 let current = self.client.balance().await; 294 if current == (self.exchange_balance, self.client_balance) { 295 return; 296 } 297 if attempts > 40 { 298 assert_eq!( 299 current, 300 (self.exchange_balance, self.client_balance), 301 "({} {}) -{diff}", 302 current.0, 303 current.1 304 ); 305 } 306 attempts += 1; 307 tokio::time::sleep(Duration::from_millis(200)).await; 308 } 309 } 310 } 311 312 /// Run logic tests against local Cyclos backend 313 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { 314 step("Run Cyclos logic harness tests"); 315 316 step("Prepare db"); 317 let pool = dbinit(cfg, reset).await?; 318 319 let client = http_client::client()?; 320 setup::setup(cfg, reset, &client).await?; 321 let cfg = HarnessCfg::parse(cfg)?; 322 let wire = Client { 323 client: &client, 324 api_url: &cfg.worker.host.api_url, 325 auth: &CyclosAuth::Basic { 326 username: cfg.worker.host.username, 327 password: cfg.worker.host.password, 328 }, 329 }; 330 let client = Client { 331 client: &client, 332 api_url: &cfg.worker.host.api_url, 333 auth: &CyclosAuth::Basic { 334 username: cfg.username, 335 password: cfg.password, 336 }, 337 }; 338 let harness = Harness { 339 pool: &pool, 340 client_payto: client 341 .whoami() 342 .await 343 .unwrap() 344 .payto(cfg.worker.root.clone()), 345 wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()), 346 client, 347 wire, 348 currency: cfg.worker.currency, 349 root: cfg.worker.root, 350 payment_type_id: *cfg.worker.payment_type_id, 351 account_type_id: *cfg.worker.account_type_id, 352 }; 353 354 step("Warmup"); 355 harness.worker().await.unwrap(); 356 let now = Timestamp::now(); 357 let balance = &mut Balances::new(&harness).await; 358 359 step("Test incoming talerable transaction"); 360 // Send talerable transaction 361 let reserve_pub = EddsaPublicKey::rand(); 362 let amount = decimal("3.3"); 363 harness 364 .client_send(&format!("Taler {reserve_pub}"), amount) 365 .await; 366 // Sync and register 367 harness.worker().await?; 368 harness.expect_incoming(reserve_pub).await; 369 balance.expect_add(amount).await; 370 371 step("Test incoming malformed transaction"); 372 // Send malformed transaction 373 let amount = decimal("3.4"); 374 harness 375 .client_send(&format!("Malformed test {now}"), amount) 376 .await; 377 balance.expect_add(amount).await; 378 // Sync and bounce 379 harness.worker().await?; 380 balance.expect_sub(amount).await; 381 382 step("Test transfer transactions"); 383 let amount = decimal("3.5"); 384 // Init a transfer to client 385 let transfer_id = harness.transfer(amount).await; 386 // Check transfer pending 387 harness 388 .expect_transfer_status(transfer_id, TransferState::pending, None) 389 .await; 390 // Should send 391 harness.worker().await?; 392 // Wait for transaction to finalize 393 balance.expect_sub(amount).await; 394 // Should register 395 harness.worker().await?; 396 // Check transfer is now successful 397 harness 398 .expect_transfer_status(transfer_id, TransferState::success, None) 399 .await; 400 401 step("Test transfer to self"); 402 // Init a transfer to self 403 let transfer_id = harness 404 .custom_transfer( 405 decimal("10.1"), 406 *harness.wire_payto.id, 407 &harness.wire_payto.name, 408 ) 409 .await; 410 // Should failed 411 harness.worker().await?; 412 // Check transfer failed 413 harness 414 .expect_transfer_status( 415 transfer_id, 416 TransferState::permanent_failure, 417 Some("permissionDenied - The operation was denied because a required permission was not granted"), 418 ) 419 .await; 420 421 step("Test transfer to unknown account"); 422 // Init a transfer to unknown 423 let transfer_id = harness 424 .custom_transfer(decimal("10.1"), 42, "Unknown") 425 .await; 426 // Should failed 427 harness.worker().await?; 428 // Check transfer failed 429 harness 430 .expect_transfer_status( 431 transfer_id, 432 TransferState::permanent_failure, 433 Some("unknown BasicUser 42"), 434 ) 435 .await; 436 437 step("Test unexpected outgoing"); 438 // Manual tx from the exchange 439 let amount = decimal("4"); 440 harness 441 .exchange_send(&format!("What is this ? {now}"), amount) 442 .await; 443 harness.worker().await?; 444 // Wait for transaction to finalize 445 balance.expect_sub(amount).await; 446 harness.worker().await?; 447 448 step("Test transfer chargeback"); 449 let amount = decimal("10.1"); 450 // Init a transfer to client 451 let transfer_id = harness.transfer(amount).await; 452 harness 453 .expect_transfer_status(transfer_id, TransferState::pending, None) 454 .await; 455 // Send 456 harness.worker().await?; 457 balance.expect_sub(amount).await; 458 harness 459 .expect_transfer_status(transfer_id, TransferState::pending, None) 460 .await; 461 // Sync 462 harness.worker().await?; 463 harness 464 .expect_transfer_status(transfer_id, TransferState::success, None) 465 .await; 466 // Chargeback 467 harness 468 .chargeback(harness.transfer_id(transfer_id).await) 469 .await; 470 balance.expect_add(amount).await; 471 harness.worker().await?; 472 harness 473 .expect_transfer_status( 474 transfer_id, 475 TransferState::late_failure, 476 Some("charged back"), 477 ) 478 .await; 479 480 step("Test recover unexpected chargeback"); 481 let amount = decimal("10.2"); 482 // Manual tx from the exchange 483 harness 484 .exchange_send(&format!("What is this chargebacked ? {now}"), amount) 485 .await; 486 balance.expect_sub(amount).await; 487 // Chargeback 488 harness 489 .chargeback(*harness.client_last_transfer().await.id) 490 .await; 491 balance.expect_add(amount).await; 492 // Sync 493 harness.worker().await?; 494 495 step("Test direct-payment failure"); 496 let amount = decimal("10.3"); 497 harness.transfer(amount).await; 498 set_failure_scenario(&["direct-payment"]); 499 assert!(matches!( 500 harness.worker().await.unwrap_err(), 501 WorkerError::Injected(InjectedErr("direct-payment")) 502 )); 503 harness.worker().await?; 504 balance.expect_sub(amount).await; 505 harness.worker().await?; 506 507 step("Test chargeback failure"); 508 // Send malformed transaction 509 let amount = decimal("10.4"); 510 harness 511 .client_send(&format!("Malformed test {now} with failure"), amount) 512 .await; 513 balance.expect_add(amount).await; 514 // Sync and bounce 515 set_failure_scenario(&["chargeback"]); 516 assert!(matches!( 517 harness.worker().await.unwrap_err(), 518 WorkerError::Injected(InjectedErr("chargeback")) 519 )); 520 balance.expect_sub(amount).await; 521 // Sync recover 522 harness.worker().await?; 523 524 step("Finish"); 525 harness.worker().await?; 526 balance.expect_add(Decimal::ZERO).await; 527 Ok(()) 528 } 529 530 /// Run online tests against real Cyclos backend 531 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { 532 step("Run Cyclos online harness tests"); 533 534 step("Prepare db"); 535 let pool = dbinit(config, reset).await?; 536 let http_client = http_client::client()?; 537 setup::setup(config, reset, &http_client).await?; 538 let cfg = HarnessCfg::parse(config)?; 539 let wire = Client { 540 client: &http_client, 541 api_url: &cfg.worker.host.api_url, 542 auth: &CyclosAuth::Basic { 543 username: cfg.worker.host.username, 544 password: cfg.worker.host.password, 545 }, 546 }; 547 let client = Client { 548 client: &http_client, 549 api_url: &cfg.worker.host.api_url, 550 auth: &CyclosAuth::Basic { 551 username: cfg.username, 552 password: cfg.password, 553 }, 554 }; 555 556 let harness = Harness { 557 pool: &pool, 558 client_payto: client 559 .whoami() 560 .await 561 .unwrap() 562 .payto(cfg.worker.root.clone()), 563 wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()), 564 client, 565 wire, 566 currency: cfg.worker.currency, 567 root: cfg.worker.root, 568 payment_type_id: *cfg.worker.payment_type_id, 569 account_type_id: *cfg.worker.account_type_id, 570 }; 571 572 step("Warmup worker"); 573 let _worker_task = { 574 let client = http_client.clone(); 575 let pool = pool.clone(); 576 let config = config.clone(); 577 tokio::spawn(async move { run_worker(&config, &pool, &client, false).await }) 578 }; 579 tokio::time::sleep(Duration::from_secs(5)).await; 580 let now = Timestamp::now(); 581 let balance = &mut Balances::new(&harness).await; 582 583 step("Test incoming transactions"); 584 let taler_amount = decimal("3"); 585 let malformed_amount = decimal("4"); 586 let reserve_pub = EddsaPublicKey::rand(); 587 harness 588 .client_send(&format!("Taler {reserve_pub}"), taler_amount) 589 .await; 590 harness 591 .client_send(&format!("Malformed test {now}"), malformed_amount) 592 .await; 593 balance.expect_add(taler_amount).await; 594 harness.expect_incoming(reserve_pub).await; 595 596 step("Test outgoing transactions"); 597 let self_amount = decimal("1"); 598 let taler_amount = decimal("2"); 599 600 let transfer_self = harness 601 .custom_transfer( 602 self_amount, 603 *harness.wire_payto.id, 604 &harness.wire_payto.name, 605 ) 606 .await; 607 let transfer_id = harness.transfer(taler_amount).await; 608 balance.expect_sub(taler_amount).await; 609 harness 610 .expect_transfer_status( 611 transfer_self, 612 TransferState::permanent_failure, 613 Some("permissionDenied - The operation was denied because a required permission was not granted"), 614 ) 615 .await; 616 harness 617 .expect_transfer_status(transfer_id, TransferState::success, None) 618 .await; 619 620 step("Finish"); 621 tokio::time::sleep(Duration::from_secs(5)).await; 622 balance.expect_add(Decimal::ZERO).await; 623 624 Ok(()) 625 } 626 627 fn main() { 628 let args = Args::parse(); 629 taler_main(CONFIG_SOURCE, args.common, async |cfg| match args.cmd { 630 Command::Logic { reset } => logic_harness(&cfg, reset).await, 631 Command::Online { reset } => online_harness(&cfg, reset).await, 632 }); 633 }