worker.rs (27087B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{num::ParseIntError, time::Duration}; 18 19 use aws_lc_rs::signature::EcdsaKeyPair; 20 use failure_injection::{InjectedErr, fail_point}; 21 use http_client::ApiErr; 22 use jiff::{Timestamp, Zoned, civil::Date}; 23 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; 24 use taler_api::subject::{self, parse_incoming_unstructured}; 25 use taler_common::{ 26 ExpoBackoffDecorr, 27 config::Config, 28 types::{ 29 amount::{self}, 30 iban::IBAN, 31 }, 32 }; 33 use tracing::{debug, error, info, trace, warn}; 34 35 use crate::{ 36 FullHuPayto, HuIban, 37 config::{AccountType, WorkerCfg}, 38 db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind}, 39 magnet_api::{ 40 api::MagnetErr, 41 client::{ApiClient, AuthClient}, 42 types::{Direction, Next, Order, TxDto, TxStatus}, 43 }, 44 setup, 45 }; 46 47 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken 48 49 #[derive(Debug, thiserror::Error)] 50 pub enum WorkerError { 51 #[error(transparent)] 52 Db(#[from] sqlx::Error), 53 #[error(transparent)] 54 Api(#[from] ApiErr<MagnetErr>), 55 #[error("Another worker is running concurrently")] 56 Concurrency, 57 #[error(transparent)] 58 Injected(#[from] InjectedErr), 59 } 60 61 pub type WorkerResult = Result<(), WorkerError>; 62 63 pub async fn run_worker( 64 cfg: &Config, 65 pool: &PgPool, 66 client: &http_client::Client, 67 transient: bool, 68 ) -> anyhow::Result<()> { 69 let cfg = WorkerCfg::parse(cfg)?; 70 let keys = setup::load(&cfg)?; 71 let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token); 72 73 if transient { 74 let mut conn = pool.acquire().await?; 75 let account = client.account(cfg.payto.bban()).await?; 76 Worker { 77 client: &client, 78 db: &mut conn, 79 account_number: &account.number, 80 account_code: account.code, 81 key: &keys.signing_key, 82 account_type: cfg.account_type, 83 ignore_tx_before: cfg.ignore_tx_before, 84 ignore_bounces_before: cfg.ignore_bounces_before, 85 } 86 .run() 87 .await?; 88 return Ok(()); 89 } 90 91 let mut jitter = ExpoBackoffDecorr::default(); 92 93 loop { 94 let res: WorkerResult = async { 95 let account = client.account(cfg.payto.bban()).await?; 96 let db = &mut PgListener::connect_with(pool).await?; 97 98 // Listen to all channels 99 db.listen_all(["transfer"]).await?; 100 101 info!(target: "worker", "running at initialisation"); 102 103 loop { 104 debug!(target: "worker", "running"); 105 Worker { 106 client: &client, 107 db: db.acquire().await?, 108 account_number: &account.number, 109 account_code: account.code, 110 key: &keys.signing_key, 111 account_type: cfg.account_type, 112 ignore_tx_before: cfg.ignore_tx_before, 113 ignore_bounces_before: cfg.ignore_bounces_before, 114 } 115 .run() 116 .await?; 117 jitter.reset(); 118 119 // Wait for notifications or sync timeout 120 if let Ok(res) = tokio::time::timeout(cfg.frequency, db.try_recv()).await { 121 let mut ntf = res?; 122 // Conflate all notifications 123 while let Some(n) = ntf { 124 debug!(target: "worker", "notification from {}", n.channel()); 125 ntf = db.next_buffered(); 126 } 127 128 if ntf.is_some() { 129 info!(target: "worker", "running at db trigger"); 130 } else { 131 info!(target: "worker", "running at frequency"); 132 } 133 } 134 } 135 } 136 .await; 137 let err = res.unwrap_err(); 138 error!(target: "worker", "{err}"); 139 140 if matches!(err, WorkerError::Concurrency) { 141 // This error won't resolve by itself easily and it mean we are actually making progress 142 // in another worker so we can jitter more aggressively 143 tokio::time::sleep(Duration::from_secs(15)).await; 144 } 145 tokio::time::sleep(jitter.backoff()).await; 146 } 147 } 148 149 pub struct Worker<'a> { 150 pub client: &'a ApiClient<'a>, 151 pub db: &'a mut PgConnection, 152 pub account_number: &'a str, 153 pub account_code: u64, 154 pub key: &'a EcdsaKeyPair, 155 pub account_type: AccountType, 156 pub ignore_tx_before: Option<Date>, 157 pub ignore_bounces_before: Option<Date>, 158 } 159 160 impl Worker<'_> { 161 /// Run a single worker pass 162 pub async fn run(&mut self) -> WorkerResult { 163 // Some worker operations are not idempotent, therefore it's not safe to have multiple worker 164 // running concurrently. We use a global Postgres advisory lock to prevent it. 165 if !db::worker_lock(self.db).await? { 166 return Err(WorkerError::Concurrency); 167 }; 168 169 // Sync transactions 170 let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused 171 let mut all_final = true; 172 let mut first = true; 173 loop { 174 let page = self 175 .client 176 .page_tx( 177 Direction::Both, 178 Order::Ascending, 179 100, 180 self.account_number, 181 &next, 182 first, 183 ) 184 .await?; 185 first = false; 186 next = page.next; 187 for item in page.list { 188 all_final &= item.tx.status.is_final(); 189 let tx = extract_tx_info(item.tx); 190 match tx { 191 Tx::In(tx_in) => { 192 // We only register final successful incoming transactions 193 if tx_in.status != TxStatus::Completed { 194 debug!(target: "worker", "pending or failed in {tx_in}"); 195 continue; 196 } 197 198 if let Some(before) = self.ignore_tx_before 199 && tx_in.value_date < before 200 { 201 debug!(target: "worker", "ignore in {tx_in}"); 202 continue; 203 } 204 let bounce = async |db: &mut PgConnection, 205 reason: &str| 206 -> Result<(), WorkerError> { 207 if let Some(before) = self.ignore_bounces_before 208 && tx_in.value_date < before 209 { 210 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now()) 211 .await? 212 { 213 AddIncomingResult::Success { new, .. } => { 214 if new { 215 info!(target: "worker", "in {tx_in} skip bounce: {reason}"); 216 } else { 217 trace!(target: "worker", "in {tx_in} already skip bounce "); 218 } 219 } 220 AddIncomingResult::ReservePubReuse 221 | AddIncomingResult::UnknownMapping 222 | AddIncomingResult::MappingReuse => unreachable!(), 223 } 224 } else { 225 let res = db::register_bounce_tx_in( 226 db, 227 &tx_in, 228 reason, 229 &Timestamp::now(), 230 ) 231 .await?; 232 233 if res.tx_new { 234 info!(target: "worker", 235 "in {tx_in} bounced in {}: {reason}", 236 res.bounce_id 237 ); 238 } else { 239 trace!(target: "worker", 240 "in {tx_in} already seen and bounced in {}: {reason}", 241 res.bounce_id 242 ); 243 } 244 } 245 Ok(()) 246 }; 247 match self.account_type { 248 AccountType::Exchange => { 249 match parse_incoming_unstructured(&tx_in.subject) { 250 Ok(subject) => match db::register_tx_in( 251 self.db, 252 &tx_in, 253 &Some(subject), 254 &Timestamp::now(), 255 ) 256 .await? 257 { 258 AddIncomingResult::Success { new, .. } => { 259 if new { 260 info!(target: "worker", "in {tx_in}"); 261 } else { 262 trace!(target: "worker", "in {tx_in} already seen"); 263 } 264 } 265 AddIncomingResult::ReservePubReuse => { 266 bounce(self.db, "reserve pub reuse").await? 267 } 268 AddIncomingResult::UnknownMapping => { 269 bounce(self.db, "unknown mapping").await? 270 } 271 AddIncomingResult::MappingReuse => { 272 bounce(self.db, "mapping reuse").await? 273 } 274 }, 275 Err(e) => bounce(self.db, &e.to_string()).await?, 276 } 277 } 278 AccountType::Normal => { 279 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now()) 280 .await? 281 { 282 AddIncomingResult::Success { new, .. } => { 283 if new { 284 info!(target: "worker", "in {tx_in}"); 285 } else { 286 trace!(target: "worker", "in {tx_in} already seen"); 287 } 288 } 289 AddIncomingResult::ReservePubReuse 290 | AddIncomingResult::UnknownMapping 291 | AddIncomingResult::MappingReuse => unreachable!(), 292 } 293 } 294 } 295 } 296 Tx::Out(tx_out) => { 297 match tx_out.status { 298 TxStatus::ToBeRecorded => { 299 self.recover_tx(&tx_out).await?; 300 continue; 301 } 302 TxStatus::PendingFirstSignature 303 | TxStatus::PendingSecondSignature 304 | TxStatus::PendingProcessing 305 | TxStatus::Verified 306 | TxStatus::PartiallyCompleted 307 | TxStatus::UnderReview => { 308 // Still pending 309 debug!(target: "worker", "pending out {tx_out}"); 310 continue; 311 } 312 TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {} 313 } 314 match self.account_type { 315 AccountType::Exchange => { 316 let kind = if let Ok(subject) = 317 subject::parse_outgoing(&tx_out.subject) 318 { 319 TxOutKind::Talerable(subject) 320 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) { 321 TxOutKind::Bounce(bounced) 322 } else { 323 TxOutKind::Simple 324 }; 325 if tx_out.status == TxStatus::Completed { 326 let res = db::register_tx_out( 327 self.db, 328 &tx_out, 329 &kind, 330 &Timestamp::now(), 331 ) 332 .await?; 333 match res.result { 334 RegisterResult::idempotent => match kind { 335 TxOutKind::Simple => { 336 trace!(target: "worker", "out malformed {tx_out} already seen") 337 } 338 TxOutKind::Bounce(_) => { 339 trace!(target: "worker", "out bounce {tx_out} already seen") 340 } 341 TxOutKind::Talerable(_) => { 342 trace!(target: "worker", "out {tx_out} already seen") 343 } 344 }, 345 RegisterResult::known => match kind { 346 TxOutKind::Simple => { 347 warn!(target: "worker", "out malformed {tx_out}") 348 } 349 TxOutKind::Bounce(_) => { 350 info!(target: "worker", "out bounce {tx_out}") 351 } 352 TxOutKind::Talerable(_) => { 353 info!(target: "worker", "out {tx_out}") 354 } 355 }, 356 RegisterResult::recovered => match kind { 357 TxOutKind::Simple => { 358 warn!(target: "worker", "out malformed (recovered) {tx_out}") 359 } 360 TxOutKind::Bounce(_) => { 361 warn!(target: "worker", "out bounce (recovered) {tx_out}") 362 } 363 TxOutKind::Talerable(_) => { 364 warn!(target: "worker", "out (recovered) {tx_out}") 365 } 366 }, 367 } 368 } else { 369 let bounced = match kind { 370 TxOutKind::Simple => None, 371 TxOutKind::Bounce(bounced) => Some(bounced), 372 TxOutKind::Talerable(_) => None, 373 }; 374 let res = db::register_tx_out_failure( 375 self.db, 376 tx_out.code, 377 bounced, 378 &Timestamp::now(), 379 ) 380 .await?; 381 if let Some(id) = res.initiated_id { 382 if res.new { 383 error!(target: "worker", "out failure {id} {tx_out}"); 384 } else { 385 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 386 } 387 } 388 } 389 } 390 AccountType::Normal => { 391 if tx_out.status == TxStatus::Completed { 392 let res = db::register_tx_out( 393 self.db, 394 &tx_out, 395 &TxOutKind::Simple, 396 &Timestamp::now(), 397 ) 398 .await?; 399 match res.result { 400 RegisterResult::idempotent => { 401 trace!(target: "worker", "out {tx_out} already seen"); 402 } 403 RegisterResult::known => { 404 info!(target: "worker", "out {tx_out}"); 405 } 406 RegisterResult::recovered => { 407 warn!(target: "worker", "out (recovered) {tx_out}"); 408 } 409 } 410 } else { 411 let res = db::register_tx_out_failure( 412 self.db, 413 tx_out.code, 414 None, 415 &Timestamp::now(), 416 ) 417 .await?; 418 if let Some(id) = res.initiated_id { 419 if res.new { 420 error!(target: "worker", "out failure {id} {tx_out}"); 421 } else { 422 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 423 } 424 } 425 } 426 } 427 } 428 } 429 } 430 } 431 432 if let Some(_next) = &next { 433 // Update in db cursor only if all previous transactions where final 434 if all_final { 435 // debug!(target: "worker", "advance cursor {next:?}"); 436 // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken 437 } 438 } else { 439 break; 440 } 441 } 442 443 // Send transactions 444 let start = Timestamp::now(); 445 let now = Zoned::now(); 446 loop { 447 let batch = db::pending_batch(&mut *self.db, &start).await?; 448 if batch.is_empty() { 449 break; 450 } 451 for tx in batch { 452 debug!(target: "worker", "send tx {tx}"); 453 self.init_tx(&tx, &now).await?; 454 } 455 } 456 Ok(()) 457 } 458 459 /// Try to sign an unsigned initiated transaction 460 pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult { 461 if db::initiated_exists_for_code(&mut *self.db, tx.code) 462 .await? 463 .is_some() 464 { 465 // Known initiated we submit it 466 assert_eq!(tx.amount.frac, 0); 467 self.submit_tx( 468 tx.code, 469 -(tx.amount.val as f64), 470 &tx.value_date, 471 tx.creditor.bban(), 472 ) 473 .await?; 474 } else { 475 // The transaction is unknown (we failed after creating it and before storing it in the db) 476 // we delete it 477 self.client.delete_tx(tx.code).await?; 478 debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code); 479 } 480 481 Ok(()) 482 } 483 484 /// Create and sign a forint transfer 485 pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult { 486 trace!(target: "worker", "init tx {tx}"); 487 assert_eq!(tx.amount.frac, 0); 488 let date = now.date(); 489 // Initialize the new transaction, on failure an orphan initiated transaction can be created 490 let res = self 491 .client 492 .init_tx( 493 self.account_code, 494 tx.amount.val as f64, 495 &tx.subject, 496 &date, 497 &tx.creditor.name, 498 tx.creditor.bban(), 499 ) 500 .await; 501 fail_point("init-tx")?; 502 let info = match res { 503 // Check if succeeded 504 Ok(info) => { 505 // Update transaction status, on failure the initiated transaction will be orphan 506 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code) 507 .await?; 508 info 509 } 510 Err(e) => { 511 if let MagnetErr::Magnet(e) = &e.err { 512 // Check if error is permanent 513 if matches!( 514 (e.error_code, e.short_message.as_str()), 515 (404, "BSZLA_NEM_TALALHATO") // Unknown account 516 | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account 517 ) { 518 db::initiated_submit_permanent_failure( 519 &mut *self.db, 520 tx.id, 521 &Timestamp::now(), 522 &e.to_string(), 523 ) 524 .await?; 525 error!(target: "worker", "initiated failure {tx}: {e}"); 526 return WorkerResult::Ok(()); 527 } 528 } 529 return Err(e.into()); 530 } 531 }; 532 trace!(target: "worker", "init tx {}", info.code); 533 534 // Sign transaction 535 self.submit_tx(info.code, info.amount, &date, tx.creditor.bban()) 536 .await?; 537 Ok(()) 538 } 539 540 /** Submit an initiated forint transfer */ 541 pub async fn submit_tx( 542 &mut self, 543 tx_code: u64, 544 amount: f64, 545 date: &Date, 546 creditor: &str, 547 ) -> WorkerResult { 548 debug!(target: "worker", "submit tx {tx_code}"); 549 fail_point("submit-tx")?; 550 // Submit an initiated transaction, on failure we will retry 551 match self 552 .client 553 .submit_tx( 554 self.key, 555 self.account_number, 556 tx_code, 557 amount, 558 date, 559 creditor, 560 ) 561 .await 562 { 563 Ok(_) => Ok(()), 564 Err(e) => { 565 if let MagnetErr::Magnet(e) = &e.err { 566 // Check if soft failure 567 if matches!( 568 (e.error_code, e.short_message.as_str()), 569 (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed 570 ) { 571 warn!(target: "worker", "submit tx {tx_code}: {e}"); 572 return Ok(()); 573 } 574 } 575 Err(e.into()) 576 } 577 } 578 } 579 } 580 581 pub enum Tx { 582 In(TxIn), 583 Out(TxOut), 584 } 585 586 pub fn extract_tx_info(tx: TxDto) -> Tx { 587 // TODO amount from f64 without allocations 588 let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs())); 589 // TODO we should support non hungarian account and error handling 590 let iban = if tx.counter_account.starts_with("HU") { 591 let iban: IBAN = tx.counter_account.parse().unwrap(); 592 HuIban::try_from(iban).unwrap() 593 } else { 594 HuIban::from_bban(&tx.counter_account).unwrap() 595 }; 596 let counter_account = FullHuPayto::new(iban, &tx.counter_name); 597 if tx.amount.is_sign_positive() { 598 Tx::In(TxIn { 599 code: tx.code, 600 amount, 601 subject: tx.subject.unwrap_or_default(), 602 debtor: counter_account, 603 value_date: tx.value_date, 604 status: tx.status, 605 }) 606 } else { 607 Tx::Out(TxOut { 608 code: tx.code, 609 amount, 610 subject: tx.subject.unwrap_or_default(), 611 creditor: counter_account, 612 value_date: tx.value_date, 613 status: tx.status, 614 }) 615 } 616 } 617 618 #[derive(Debug, thiserror::Error)] 619 pub enum BounceSubjectErr { 620 #[error("missing parts")] 621 MissingParts, 622 #[error("not a bounce")] 623 NotBounce, 624 #[error("malformed bounced id: {0}")] 625 Id(#[from] ParseIntError), 626 } 627 628 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> { 629 let (prefix, id) = subject 630 .rsplit_once(" ") 631 .ok_or(BounceSubjectErr::MissingParts)?; 632 if !prefix.starts_with("bounce") { 633 return Err(BounceSubjectErr::NotBounce); 634 } 635 let id: u32 = id.parse()?; 636 Ok(id) 637 }