cyclos-harness.rs (19123B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::time::Duration; 18 19 use clap::Parser as _; 20 use compact_str::CompactString; 21 use failure_injection::{InjectedErr, set_failure_scenario}; 22 use jiff::Timestamp; 23 use owo_colors::OwoColorize as _; 24 use sqlx::{PgPool, Row as _, postgres::PgRow}; 25 use taler_api::notification::dummy_listen; 26 use taler_build::long_version; 27 use taler_common::{ 28 CommonArgs, 29 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 30 api_params::{History, Page}, 31 api_wire::{IncomingBankTransaction, TransferState}, 32 config::Config, 33 taler_main, 34 types::{ 35 amount::{Currency, Decimal, decimal}, 36 url, 37 }, 38 }; 39 use taler_cyclos::{ 40 config::{AccountType, HarnessCfg}, 41 constants::CONFIG_SOURCE, 42 cyclos_api::{ 43 api::CyclosAuth, 44 client::Client, 45 types::{HistoryItem, OrderBy}, 46 }, 47 db::{self, TransferResult, dbinit}, 48 payto::FullCyclosPayto, 49 setup, 50 worker::{Worker, WorkerError, WorkerResult, run_worker}, 51 }; 52 53 /// Cyclos Adapter harness test suite 54 #[derive(clap::Parser, Debug)] 55 #[command(long_version = long_version(), about, long_about = None)] 56 struct Args { 57 #[clap(flatten)] 58 common: CommonArgs, 59 60 #[command(subcommand)] 61 cmd: Command, 62 } 63 64 #[derive(clap::Subcommand, Debug)] 65 enum Command { 66 /// Run logic tests 67 Logic { 68 #[clap(long, short)] 69 reset: bool, 70 }, 71 /// Run online tests 72 Online { 73 #[clap(long, short)] 74 reset: bool, 75 }, 76 } 77 78 fn step(step: &str) { 79 println!("{}", step.green()); 80 } 81 82 struct Harness<'a> { 83 pool: &'a PgPool, 84 client: Client<'a>, 85 wire: Client<'a>, 86 client_payto: FullCyclosPayto, 87 wire_payto: FullCyclosPayto, 88 payment_type_id: i64, 89 account_type_id: i64, 90 currency: Currency, 91 root: CompactString, 92 } 93 94 impl<'a> Harness<'a> { 95 async fn balance(&self) -> (Decimal, Decimal) { 96 let (exchange, client) = 97 tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap(); 98 ( 99 exchange[0] 100 .status 101 .available_balance 102 .unwrap_or(exchange[0].status.balance), 103 client[0] 104 .status 105 .available_balance 106 .unwrap_or(client[0].status.balance), 107 ) 108 } 109 110 /// Send transaction from client to exchange 111 async fn client_send(&self, subject: &str, amount: Decimal) -> i64 { 112 *self 113 .client 114 .direct_payment(*self.wire_payto.id, self.payment_type_id, amount, subject) 115 .await 116 .unwrap() 117 .id 118 } 119 120 /// Send transaction from exchange to client 121 async fn exchange_send(&self, subject: &str, amount: Decimal) -> i64 { 122 *self 123 .wire 124 .direct_payment(*self.client_payto.id, self.payment_type_id, amount, subject) 125 .await 126 .unwrap() 127 .id 128 } 129 130 /// Chargeback a transfer 131 async fn chargeback(&self, id: i64) -> i64 { 132 self.client.chargeback(id).await.unwrap() 133 } 134 135 /// Fetch last transfer related to client 136 async fn client_last_transfer(&self) -> HistoryItem { 137 self.client 138 .history(*self.client_payto.id, OrderBy::DateDesc, 0, None) 139 .await 140 .unwrap() 141 .page 142 .remove(0) 143 } 144 145 /// Run the worker once 146 async fn worker(&'a self) -> WorkerResult { 147 let db = &mut self.pool.acquire().await.unwrap().detach(); 148 Worker { 149 db, 150 currency: self.currency.clone(), 151 client: &self.wire, 152 account_type: AccountType::Exchange, 153 account_type_id: self.account_type_id, 154 payment_type_id: self.payment_type_id, 155 } 156 .run() 157 .await 158 } 159 160 async fn expect_incoming(&self, key: EddsaPublicKey) { 161 let transfer = db::incoming_history( 162 self.pool, 163 &History { 164 page: Page { 165 limit: -1, 166 offset: None, 167 }, 168 timeout_ms: None, 169 }, 170 &self.currency, 171 &self.root, 172 dummy_listen, 173 ) 174 .await 175 .unwrap(); 176 assert!(matches!( 177 transfer.first().unwrap(), 178 IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key, 179 )); 180 } 181 182 async fn custom_transfer(&self, amount: Decimal, creditor_id: i64, creditor_name: &str) -> u64 { 183 let res = db::make_transfer( 184 self.pool, 185 &db::Transfer { 186 request_uid: HashCode::rand(), 187 amount, 188 exchange_base_url: url("https://test.com"), 189 metadata: None, 190 wtid: ShortHashCode::rand(), 191 creditor_id, 192 creditor_name: CompactString::new(creditor_name), 193 }, 194 &Timestamp::now(), 195 ) 196 .await 197 .unwrap(); 198 match res { 199 TransferResult::Success { id, .. } => id, 200 TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(), 201 } 202 } 203 204 async fn transfer(&self, amount: Decimal) -> u64 { 205 self.custom_transfer(amount, *self.client_payto.id, &self.client_payto.name) 206 .await 207 } 208 209 async fn transfer_id(&self, transfer_id: u64) -> i64 { 210 sqlx::query( 211 "SELECT transfer_id 212 FROM transfer 213 JOIN initiated USING (initiated_id) 214 JOIN tx_out USING (tx_out_id) 215 WHERE initiated_id=$1", 216 ) 217 .bind(transfer_id as i64) 218 .try_map(|r: PgRow| r.try_get(0)) 219 .fetch_one(self.pool) 220 .await 221 .unwrap() 222 } 223 224 async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) { 225 let mut attempts = 0; 226 loop { 227 let transfer = db::transfer_by_id(self.pool, id, &self.currency, &self.root) 228 .await 229 .unwrap() 230 .unwrap(); 231 if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) { 232 return; 233 } 234 if attempts > 40 { 235 assert_eq!( 236 (transfer.status, transfer.status_msg.as_deref()), 237 (status, msg) 238 ); 239 } 240 attempts += 1; 241 tokio::time::sleep(Duration::from_millis(200)).await; 242 } 243 } 244 } 245 246 struct Balances<'a> { 247 client: &'a Harness<'a>, 248 exchange_balance: Decimal, 249 client_balance: Decimal, 250 } 251 252 impl<'a> Balances<'a> { 253 pub async fn new(client: &'a Harness<'a>) -> Self { 254 let (exchange_balance, client_balance) = client.balance().await; 255 Self { 256 client, 257 exchange_balance, 258 client_balance, 259 } 260 } 261 262 async fn expect_add(&mut self, diff: Decimal) { 263 self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap(); 264 self.client_balance = self.client_balance.try_sub(&diff).unwrap(); 265 let mut attempts = 0; 266 loop { 267 let current = self.client.balance().await; 268 if current == (self.exchange_balance, self.client_balance) { 269 return; 270 } 271 if attempts > 40 { 272 assert_eq!( 273 current, 274 (self.exchange_balance, self.client_balance), 275 "({} {}) +{diff}", 276 current.0, 277 current.1 278 ); 279 } 280 attempts += 1; 281 tokio::time::sleep(Duration::from_millis(200)).await; 282 } 283 } 284 285 async fn expect_sub(&mut self, diff: Decimal) { 286 self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap(); 287 self.client_balance = self.client_balance.try_add(&diff).unwrap(); 288 289 let mut attempts = 0; 290 loop { 291 let current = self.client.balance().await; 292 if current == (self.exchange_balance, self.client_balance) { 293 return; 294 } 295 if attempts > 40 { 296 assert_eq!( 297 current, 298 (self.exchange_balance, self.client_balance), 299 "({} {}) -{diff}", 300 current.0, 301 current.1 302 ); 303 } 304 attempts += 1; 305 tokio::time::sleep(Duration::from_millis(200)).await; 306 } 307 } 308 } 309 310 /// Run logic tests against local Cyclos backend 311 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { 312 step("Run Cyclos logic harness tests"); 313 314 step("Prepare db"); 315 let pool = dbinit(cfg, reset).await?; 316 317 let client = http_client::client()?; 318 setup::setup(cfg, reset, &client).await?; 319 let cfg = HarnessCfg::parse(cfg)?; 320 let wire = Client { 321 client: &client, 322 api_url: &cfg.worker.host.api_url, 323 auth: &CyclosAuth::Basic { 324 username: cfg.worker.host.username, 325 password: cfg.worker.host.password, 326 }, 327 }; 328 let client = Client { 329 client: &client, 330 api_url: &cfg.worker.host.api_url, 331 auth: &CyclosAuth::Basic { 332 username: cfg.username, 333 password: cfg.password, 334 }, 335 }; 336 let harness = Harness { 337 pool: &pool, 338 client_payto: client 339 .whoami() 340 .await 341 .unwrap() 342 .payto(cfg.worker.root.clone()), 343 wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()), 344 client, 345 wire, 346 currency: cfg.worker.currency, 347 root: cfg.worker.root, 348 payment_type_id: *cfg.worker.payment_type_id, 349 account_type_id: *cfg.worker.account_type_id, 350 }; 351 352 step("Warmup"); 353 harness.worker().await.unwrap(); 354 let now = Timestamp::now(); 355 let balance = &mut Balances::new(&harness).await; 356 357 step("Test incoming talerable transaction"); 358 // Send talerable transaction 359 let reserve_pub = EddsaPublicKey::rand(); 360 let amount = decimal("3.3"); 361 harness 362 .client_send(&format!("Taler {reserve_pub}"), amount) 363 .await; 364 // Sync and register 365 harness.worker().await?; 366 harness.expect_incoming(reserve_pub).await; 367 balance.expect_add(amount).await; 368 369 step("Test incoming malformed transaction"); 370 // Send malformed transaction 371 let amount = decimal("3.4"); 372 harness 373 .client_send(&format!("Malformed test {now}"), amount) 374 .await; 375 balance.expect_add(amount).await; 376 // Sync and bounce 377 harness.worker().await?; 378 balance.expect_sub(amount).await; 379 380 step("Test transfer transactions"); 381 let amount = decimal("3.5"); 382 // Init a transfer to client 383 let transfer_id = harness.transfer(amount).await; 384 // Check transfer pending 385 harness 386 .expect_transfer_status(transfer_id, TransferState::pending, None) 387 .await; 388 // Should send 389 harness.worker().await?; 390 // Wait for transaction to finalize 391 balance.expect_sub(amount).await; 392 // Should register 393 harness.worker().await?; 394 // Check transfer is now successful 395 harness 396 .expect_transfer_status(transfer_id, TransferState::success, None) 397 .await; 398 399 step("Test transfer to self"); 400 // Init a transfer to self 401 let transfer_id = harness 402 .custom_transfer( 403 decimal("10.1"), 404 *harness.wire_payto.id, 405 &harness.wire_payto.name, 406 ) 407 .await; 408 // Should failed 409 harness.worker().await?; 410 // Check transfer failed 411 harness 412 .expect_transfer_status( 413 transfer_id, 414 TransferState::permanent_failure, 415 Some("permissionDenied - The operation was denied because a required permission was not granted"), 416 ) 417 .await; 418 419 step("Test transfer to unknown account"); 420 // Init a transfer to unknown 421 let transfer_id = harness 422 .custom_transfer(decimal("10.1"), 42, "Unknown") 423 .await; 424 // Should failed 425 harness.worker().await?; 426 // Check transfer failed 427 harness 428 .expect_transfer_status( 429 transfer_id, 430 TransferState::permanent_failure, 431 Some("unknown BasicUser 42"), 432 ) 433 .await; 434 435 step("Test unexpected outgoing"); 436 // Manual tx from the exchange 437 let amount = decimal("4"); 438 harness 439 .exchange_send(&format!("What is this ? {now}"), amount) 440 .await; 441 harness.worker().await?; 442 // Wait for transaction to finalize 443 balance.expect_sub(amount).await; 444 harness.worker().await?; 445 446 step("Test transfer chargeback"); 447 let amount = decimal("10.1"); 448 // Init a transfer to client 449 let transfer_id = harness.transfer(amount).await; 450 harness 451 .expect_transfer_status(transfer_id, TransferState::pending, None) 452 .await; 453 // Send 454 harness.worker().await?; 455 balance.expect_sub(amount).await; 456 harness 457 .expect_transfer_status(transfer_id, TransferState::pending, None) 458 .await; 459 // Sync 460 harness.worker().await?; 461 harness 462 .expect_transfer_status(transfer_id, TransferState::success, None) 463 .await; 464 // Chargeback 465 harness 466 .chargeback(harness.transfer_id(transfer_id).await) 467 .await; 468 balance.expect_add(amount).await; 469 harness.worker().await?; 470 harness 471 .expect_transfer_status( 472 transfer_id, 473 TransferState::late_failure, 474 Some("charged back"), 475 ) 476 .await; 477 478 step("Test recover unexpected chargeback"); 479 let amount = decimal("10.2"); 480 // Manual tx from the exchange 481 harness 482 .exchange_send(&format!("What is this chargebacked ? {now}"), amount) 483 .await; 484 balance.expect_sub(amount).await; 485 // Chargeback 486 harness 487 .chargeback(*harness.client_last_transfer().await.id) 488 .await; 489 balance.expect_add(amount).await; 490 // Sync 491 harness.worker().await?; 492 493 step("Test direct-payment failure"); 494 let amount = decimal("10.3"); 495 harness.transfer(amount).await; 496 set_failure_scenario(&["direct-payment"]); 497 assert!(matches!( 498 harness.worker().await.unwrap_err(), 499 WorkerError::Injected(InjectedErr("direct-payment")) 500 )); 501 harness.worker().await?; 502 balance.expect_sub(amount).await; 503 harness.worker().await?; 504 505 step("Test chargeback failure"); 506 // Send malformed transaction 507 let amount = decimal("10.4"); 508 harness 509 .client_send(&format!("Malformed test {now} with failure"), amount) 510 .await; 511 balance.expect_add(amount).await; 512 // Sync and bounce 513 set_failure_scenario(&["chargeback"]); 514 assert!(matches!( 515 harness.worker().await.unwrap_err(), 516 WorkerError::Injected(InjectedErr("chargeback")) 517 )); 518 balance.expect_sub(amount).await; 519 // Sync recover 520 harness.worker().await?; 521 522 step("Finish"); 523 harness.worker().await?; 524 balance.expect_add(Decimal::zero()).await; 525 Ok(()) 526 } 527 528 /// Run online tests against real Cyclos backend 529 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { 530 step("Run Cyclos online harness tests"); 531 532 step("Prepare db"); 533 let pool = dbinit(config, reset).await?; 534 let http_client = http_client::client()?; 535 setup::setup(config, reset, &http_client).await?; 536 let cfg = HarnessCfg::parse(config)?; 537 let wire = Client { 538 client: &http_client, 539 api_url: &cfg.worker.host.api_url, 540 auth: &CyclosAuth::Basic { 541 username: cfg.worker.host.username, 542 password: cfg.worker.host.password, 543 }, 544 }; 545 let client = Client { 546 client: &http_client, 547 api_url: &cfg.worker.host.api_url, 548 auth: &CyclosAuth::Basic { 549 username: cfg.username, 550 password: cfg.password, 551 }, 552 }; 553 554 let harness = Harness { 555 pool: &pool, 556 client_payto: client 557 .whoami() 558 .await 559 .unwrap() 560 .payto(cfg.worker.root.clone()), 561 wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()), 562 client, 563 wire, 564 currency: cfg.worker.currency, 565 root: cfg.worker.root, 566 payment_type_id: *cfg.worker.payment_type_id, 567 account_type_id: *cfg.worker.account_type_id, 568 }; 569 570 step("Warmup worker"); 571 let _worker_task = { 572 let client = http_client.clone(); 573 let pool = pool.clone(); 574 let config = config.clone(); 575 tokio::spawn(async move { run_worker(&config, &pool, &client, false).await }) 576 }; 577 tokio::time::sleep(Duration::from_secs(5)).await; 578 let now = Timestamp::now(); 579 let balance = &mut Balances::new(&harness).await; 580 581 step("Test incoming transactions"); 582 let taler_amount = decimal("3"); 583 let malformed_amount = decimal("4"); 584 let reserve_pub = EddsaPublicKey::rand(); 585 harness 586 .client_send(&format!("Taler {reserve_pub}"), taler_amount) 587 .await; 588 harness 589 .client_send(&format!("Malformed test {now}"), malformed_amount) 590 .await; 591 balance.expect_add(taler_amount).await; 592 harness.expect_incoming(reserve_pub).await; 593 594 step("Test outgoing transactions"); 595 let self_amount = decimal("1"); 596 let taler_amount = decimal("2"); 597 598 let transfer_self = harness 599 .custom_transfer( 600 self_amount, 601 *harness.wire_payto.id, 602 &harness.wire_payto.name, 603 ) 604 .await; 605 let transfer_id = harness.transfer(taler_amount).await; 606 balance.expect_sub(taler_amount).await; 607 harness 608 .expect_transfer_status( 609 transfer_self, 610 TransferState::permanent_failure, 611 Some("permissionDenied - The operation was denied because a required permission was not granted"), 612 ) 613 .await; 614 harness 615 .expect_transfer_status(transfer_id, TransferState::success, None) 616 .await; 617 618 step("Finish"); 619 tokio::time::sleep(Duration::from_secs(5)).await; 620 balance.expect_add(Decimal::zero()).await; 621 622 Ok(()) 623 } 624 625 fn main() { 626 let args = Args::parse(); 627 taler_main(CONFIG_SOURCE, args.common, async |cfg| match args.cmd { 628 Command::Logic { reset } => logic_harness(&cfg, reset).await, 629 Command::Online { reset } => online_harness(&cfg, reset).await, 630 }); 631 }