worker.rs (27187B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{num::ParseIntError, time::Duration}; 18 19 use aws_lc_rs::signature::EcdsaKeyPair; 20 use failure_injection::{InjectedErr, fail_point}; 21 use http_client::ApiErr; 22 use jiff::{Timestamp, Zoned, civil::Date}; 23 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; 24 use taler_api::subject::{self, parse_incoming_unstructured}; 25 use taler_common::{ 26 ExpoBackoffDecorr, 27 config::Config, 28 types::{ 29 amount::{self}, 30 iban::IBAN, 31 }, 32 }; 33 use tracing::{debug, error, info, trace, warn}; 34 35 use crate::{ 36 FullHuPayto, HuIban, 37 config::{AccountType, WorkerCfg}, 38 db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind}, 39 magnet_api::{ 40 api::MagnetErr, 41 client::{ApiClient, AuthClient}, 42 types::{Direction, Next, Order, TxDto, TxStatus}, 43 }, 44 setup, 45 }; 46 47 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken 48 49 #[derive(Debug, thiserror::Error)] 50 pub enum WorkerError { 51 #[error(transparent)] 52 Db(#[from] sqlx::Error), 53 #[error(transparent)] 54 Api(#[from] ApiErr<MagnetErr>), 55 #[error("Another worker is running concurrently")] 56 Concurrency, 57 #[error(transparent)] 58 Injected(#[from] InjectedErr), 59 } 60 61 pub type WorkerResult = Result<(), WorkerError>; 62 63 pub async fn run_worker( 64 cfg: &Config, 65 pool: &PgPool, 66 client: &http_client::Client, 67 transient: bool, 68 ) -> anyhow::Result<()> { 69 let cfg = WorkerCfg::parse(cfg)?; 70 let keys = setup::load(&cfg)?; 71 let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token); 72 73 if transient { 74 let mut conn = pool.acquire().await?; 75 let account = client.account(cfg.payto.bban()).await?; 76 Worker { 77 client: &client, 78 db: &mut conn, 79 account_number: &account.number, 80 account_code: account.code, 81 key: &keys.signing_key, 82 account_type: cfg.account_type, 83 ignore_tx_before: cfg.ignore_tx_before, 84 ignore_bounces_before: cfg.ignore_bounces_before, 85 } 86 .run() 87 .await?; 88 return Ok(()); 89 } 90 91 let mut jitter = ExpoBackoffDecorr::default(); 92 93 loop { 94 let res: WorkerResult = async { 95 let account = client.account(cfg.payto.bban()).await?; 96 let db = &mut PgListener::connect_with(pool).await?; 97 98 // Listen to all channels 99 db.listen_all(["transfer"]).await?; 100 101 info!(target: "worker", "running at initialisation"); 102 103 loop { 104 debug!(target: "worker", "running"); 105 Worker { 106 client: &client, 107 db: db.acquire().await?, 108 account_number: &account.number, 109 account_code: account.code, 110 key: &keys.signing_key, 111 account_type: cfg.account_type, 112 ignore_tx_before: cfg.ignore_tx_before, 113 ignore_bounces_before: cfg.ignore_bounces_before, 114 } 115 .run() 116 .await?; 117 jitter.reset(); 118 119 // Wait for notifications or sync timeout 120 if let Ok(res) = tokio::time::timeout(cfg.frequency, db.try_recv()).await { 121 let mut ntf = res?; 122 // Conflate all notifications 123 while let Some(n) = ntf { 124 debug!(target: "worker", "notification from {}", n.channel()); 125 ntf = db.next_buffered(); 126 } 127 128 if ntf.is_some() { 129 info!(target: "worker", "running at db trigger"); 130 } else { 131 info!(target: "worker", "running at frequency"); 132 } 133 } 134 } 135 } 136 .await; 137 let err = res.unwrap_err(); 138 error!(target: "worker", "{err}"); 139 140 if matches!(err, WorkerError::Concurrency) { 141 // This error won't resolve by itself easily and it mean we are actually making progress 142 // in another worker so we can jitter more aggressively 143 tokio::time::sleep(Duration::from_secs(15)).await; 144 } 145 tokio::time::sleep(jitter.backoff()).await; 146 } 147 } 148 149 pub struct Worker<'a> { 150 pub client: &'a ApiClient<'a>, 151 pub db: &'a mut PgConnection, 152 pub account_number: &'a str, 153 pub account_code: u64, 154 pub key: &'a EcdsaKeyPair, 155 pub account_type: AccountType, 156 pub ignore_tx_before: Option<Date>, 157 pub ignore_bounces_before: Option<Date>, 158 } 159 160 impl Worker<'_> { 161 /// Run a single worker pass 162 pub async fn run(&mut self) -> WorkerResult { 163 // Some worker operations are not idempotent, therefore it's not safe to have multiple worker 164 // running concurrently. We use a global Postgres advisory lock to prevent it. 165 if !db::worker_lock(self.db).await? { 166 return Err(WorkerError::Concurrency); 167 }; 168 169 // Sync transactions 170 let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused 171 let mut all_final = true; 172 let mut first = true; 173 loop { 174 let page = self 175 .client 176 .page_tx( 177 Direction::Both, 178 Order::Ascending, 179 100, 180 self.account_number, 181 &next, 182 first, 183 ) 184 .await?; 185 first = false; 186 next = page.next; 187 for item in page.list { 188 all_final &= item.tx.status.is_final(); 189 let tx = extract_tx_info(item.tx); 190 match tx { 191 Tx::In(tx_in) => { 192 // We only register final successful incoming transactions 193 if tx_in.status != TxStatus::Completed { 194 debug!(target: "worker", "pending or failed in {tx_in}"); 195 continue; 196 } 197 198 if let Some(before) = self.ignore_tx_before 199 && tx_in.value_date < before 200 { 201 debug!(target: "worker", "ignore in {tx_in}"); 202 continue; 203 } 204 let bounce = async |db: &mut PgConnection, 205 reason: &str| 206 -> Result<(), WorkerError> { 207 if let Some(before) = self.ignore_bounces_before 208 && tx_in.value_date < before 209 { 210 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now()) 211 .await? 212 { 213 AddIncomingResult::Success { new, .. } => { 214 if new { 215 info!(target: "worker", "in {tx_in} skip bounce: {reason}"); 216 } else { 217 trace!(target: "worker", "in {tx_in} already skip bounce "); 218 } 219 } 220 AddIncomingResult::ReservePubReuse 221 | AddIncomingResult::UnknownMapping 222 | AddIncomingResult::MappingReuse => unreachable!(), 223 } 224 } else { 225 let res = db::register_bounce_tx_in( 226 db, 227 &tx_in, 228 reason, 229 &Timestamp::now(), 230 ) 231 .await?; 232 233 if res.tx_new { 234 info!(target: "worker", 235 "in {tx_in} bounced in {}: {reason}", 236 res.bounce_id 237 ); 238 } else { 239 trace!(target: "worker", 240 "in {tx_in} already seen and bounced in {}: {reason}", 241 res.bounce_id 242 ); 243 } 244 } 245 Ok(()) 246 }; 247 match self.account_type { 248 AccountType::Exchange => { 249 match parse_incoming_unstructured(&tx_in.subject) { 250 Ok(None) => bounce(self.db, "missing public key").await?, 251 Ok(Some(subject)) => match db::register_tx_in( 252 self.db, 253 &tx_in, 254 &Some(subject), 255 &Timestamp::now(), 256 ) 257 .await? 258 { 259 AddIncomingResult::Success { new, .. } => { 260 if new { 261 info!(target: "worker", "in {tx_in}"); 262 } else { 263 trace!(target: "worker", "in {tx_in} already seen"); 264 } 265 } 266 AddIncomingResult::ReservePubReuse => { 267 bounce(self.db, "reserve pub reuse").await? 268 } 269 AddIncomingResult::UnknownMapping => { 270 bounce(self.db, "unknown mapping").await? 271 } 272 AddIncomingResult::MappingReuse => { 273 bounce(self.db, "mapping reuse").await? 274 } 275 }, 276 Err(e) => bounce(self.db, &e.to_string()).await?, 277 } 278 } 279 AccountType::Normal => { 280 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now()) 281 .await? 282 { 283 AddIncomingResult::Success { new, .. } => { 284 if new { 285 info!(target: "worker", "in {tx_in}"); 286 } else { 287 trace!(target: "worker", "in {tx_in} already seen"); 288 } 289 } 290 AddIncomingResult::ReservePubReuse 291 | AddIncomingResult::UnknownMapping 292 | AddIncomingResult::MappingReuse => unreachable!(), 293 } 294 } 295 } 296 } 297 Tx::Out(tx_out) => { 298 match tx_out.status { 299 TxStatus::ToBeRecorded => { 300 self.recover_tx(&tx_out).await?; 301 continue; 302 } 303 TxStatus::PendingFirstSignature 304 | TxStatus::PendingSecondSignature 305 | TxStatus::PendingProcessing 306 | TxStatus::Verified 307 | TxStatus::PartiallyCompleted 308 | TxStatus::UnderReview => { 309 // Still pending 310 debug!(target: "worker", "pending out {tx_out}"); 311 continue; 312 } 313 TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {} 314 } 315 match self.account_type { 316 AccountType::Exchange => { 317 let kind = if let Ok(subject) = 318 subject::parse_outgoing(&tx_out.subject) 319 { 320 TxOutKind::Talerable(subject) 321 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) { 322 TxOutKind::Bounce(bounced) 323 } else { 324 TxOutKind::Simple 325 }; 326 if tx_out.status == TxStatus::Completed { 327 let res = db::register_tx_out( 328 self.db, 329 &tx_out, 330 &kind, 331 &Timestamp::now(), 332 ) 333 .await?; 334 match res.result { 335 RegisterResult::idempotent => match kind { 336 TxOutKind::Simple => { 337 trace!(target: "worker", "out malformed {tx_out} already seen") 338 } 339 TxOutKind::Bounce(_) => { 340 trace!(target: "worker", "out bounce {tx_out} already seen") 341 } 342 TxOutKind::Talerable(_) => { 343 trace!(target: "worker", "out {tx_out} already seen") 344 } 345 }, 346 RegisterResult::known => match kind { 347 TxOutKind::Simple => { 348 warn!(target: "worker", "out malformed {tx_out}") 349 } 350 TxOutKind::Bounce(_) => { 351 info!(target: "worker", "out bounce {tx_out}") 352 } 353 TxOutKind::Talerable(_) => { 354 info!(target: "worker", "out {tx_out}") 355 } 356 }, 357 RegisterResult::recovered => match kind { 358 TxOutKind::Simple => { 359 warn!(target: "worker", "out malformed (recovered) {tx_out}") 360 } 361 TxOutKind::Bounce(_) => { 362 warn!(target: "worker", "out bounce (recovered) {tx_out}") 363 } 364 TxOutKind::Talerable(_) => { 365 warn!(target: "worker", "out (recovered) {tx_out}") 366 } 367 }, 368 } 369 } else { 370 let bounced = match kind { 371 TxOutKind::Simple => None, 372 TxOutKind::Bounce(bounced) => Some(bounced), 373 TxOutKind::Talerable(_) => None, 374 }; 375 let res = db::register_tx_out_failure( 376 self.db, 377 tx_out.code, 378 bounced, 379 &Timestamp::now(), 380 ) 381 .await?; 382 if let Some(id) = res.initiated_id { 383 if res.new { 384 error!(target: "worker", "out failure {id} {tx_out}"); 385 } else { 386 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 387 } 388 } 389 } 390 } 391 AccountType::Normal => { 392 if tx_out.status == TxStatus::Completed { 393 let res = db::register_tx_out( 394 self.db, 395 &tx_out, 396 &TxOutKind::Simple, 397 &Timestamp::now(), 398 ) 399 .await?; 400 match res.result { 401 RegisterResult::idempotent => { 402 trace!(target: "worker", "out {tx_out} already seen"); 403 } 404 RegisterResult::known => { 405 info!(target: "worker", "out {tx_out}"); 406 } 407 RegisterResult::recovered => { 408 warn!(target: "worker", "out (recovered) {tx_out}"); 409 } 410 } 411 } else { 412 let res = db::register_tx_out_failure( 413 self.db, 414 tx_out.code, 415 None, 416 &Timestamp::now(), 417 ) 418 .await?; 419 if let Some(id) = res.initiated_id { 420 if res.new { 421 error!(target: "worker", "out failure {id} {tx_out}"); 422 } else { 423 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 424 } 425 } 426 } 427 } 428 } 429 } 430 } 431 } 432 433 if let Some(_next) = &next { 434 // Update in db cursor only if all previous transactions where final 435 if all_final { 436 // debug!(target: "worker", "advance cursor {next:?}"); 437 // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken 438 } 439 } else { 440 break; 441 } 442 } 443 444 // Send transactions 445 let start = Timestamp::now(); 446 let now = Zoned::now(); 447 loop { 448 let batch = db::pending_batch(&mut *self.db, &start).await?; 449 if batch.is_empty() { 450 break; 451 } 452 for tx in batch { 453 debug!(target: "worker", "send tx {tx}"); 454 self.init_tx(&tx, &now).await?; 455 } 456 } 457 Ok(()) 458 } 459 460 /// Try to sign an unsigned initiated transaction 461 pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult { 462 if db::initiated_exists_for_code(&mut *self.db, tx.code) 463 .await? 464 .is_some() 465 { 466 // Known initiated we submit it 467 assert_eq!(tx.amount.frac, 0); 468 self.submit_tx( 469 tx.code, 470 -(tx.amount.val as f64), 471 &tx.value_date, 472 tx.creditor.bban(), 473 ) 474 .await?; 475 } else { 476 // The transaction is unknown (we failed after creating it and before storing it in the db) 477 // we delete it 478 self.client.delete_tx(tx.code).await?; 479 debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code); 480 } 481 482 Ok(()) 483 } 484 485 /// Create and sign a forint transfer 486 pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult { 487 trace!(target: "worker", "init tx {tx}"); 488 assert_eq!(tx.amount.frac, 0); 489 let date = now.date(); 490 // Initialize the new transaction, on failure an orphan initiated transaction can be created 491 let res = self 492 .client 493 .init_tx( 494 self.account_code, 495 tx.amount.val as f64, 496 &tx.subject, 497 &date, 498 &tx.creditor.name, 499 tx.creditor.bban(), 500 ) 501 .await; 502 fail_point("init-tx")?; 503 let info = match res { 504 // Check if succeeded 505 Ok(info) => { 506 // Update transaction status, on failure the initiated transaction will be orphan 507 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code) 508 .await?; 509 info 510 } 511 Err(e) => { 512 if let MagnetErr::Magnet(e) = &e.err { 513 // Check if error is permanent 514 if matches!( 515 (e.error_code, e.short_message.as_str()), 516 (404, "BSZLA_NEM_TALALHATO") // Unknown account 517 | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account 518 ) { 519 db::initiated_submit_permanent_failure( 520 &mut *self.db, 521 tx.id, 522 &Timestamp::now(), 523 &e.to_string(), 524 ) 525 .await?; 526 error!(target: "worker", "initiated failure {tx}: {e}"); 527 return WorkerResult::Ok(()); 528 } 529 } 530 return Err(e.into()); 531 } 532 }; 533 trace!(target: "worker", "init tx {}", info.code); 534 535 // Sign transaction 536 self.submit_tx(info.code, info.amount, &date, tx.creditor.bban()) 537 .await?; 538 Ok(()) 539 } 540 541 /** Submit an initiated forint transfer */ 542 pub async fn submit_tx( 543 &mut self, 544 tx_code: u64, 545 amount: f64, 546 date: &Date, 547 creditor: &str, 548 ) -> WorkerResult { 549 debug!(target: "worker", "submit tx {tx_code}"); 550 fail_point("submit-tx")?; 551 // Submit an initiated transaction, on failure we will retry 552 match self 553 .client 554 .submit_tx( 555 self.key, 556 self.account_number, 557 tx_code, 558 amount, 559 date, 560 creditor, 561 ) 562 .await 563 { 564 Ok(_) => Ok(()), 565 Err(e) => { 566 if let MagnetErr::Magnet(e) = &e.err { 567 // Check if soft failure 568 if matches!( 569 (e.error_code, e.short_message.as_str()), 570 (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed 571 ) { 572 warn!(target: "worker", "submit tx {tx_code}: {e}"); 573 return Ok(()); 574 } 575 } 576 Err(e.into()) 577 } 578 } 579 } 580 } 581 582 pub enum Tx { 583 In(TxIn), 584 Out(TxOut), 585 } 586 587 pub fn extract_tx_info(tx: TxDto) -> Tx { 588 // TODO amount from f64 without allocations 589 let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs())); 590 // TODO we should support non hungarian account and error handling 591 let iban = if tx.counter_account.starts_with("HU") { 592 let iban: IBAN = tx.counter_account.parse().unwrap(); 593 HuIban::try_from(iban).unwrap() 594 } else { 595 HuIban::from_bban(&tx.counter_account).unwrap() 596 }; 597 let counter_account = FullHuPayto::new(iban, &tx.counter_name); 598 if tx.amount.is_sign_positive() { 599 Tx::In(TxIn { 600 code: tx.code, 601 amount, 602 subject: tx.subject.unwrap_or_default(), 603 debtor: counter_account, 604 value_date: tx.value_date, 605 status: tx.status, 606 }) 607 } else { 608 Tx::Out(TxOut { 609 code: tx.code, 610 amount, 611 subject: tx.subject.unwrap_or_default(), 612 creditor: counter_account, 613 value_date: tx.value_date, 614 status: tx.status, 615 }) 616 } 617 } 618 619 #[derive(Debug, thiserror::Error)] 620 pub enum BounceSubjectErr { 621 #[error("missing parts")] 622 MissingParts, 623 #[error("not a bounce")] 624 NotBounce, 625 #[error("malformed bounced id: {0}")] 626 Id(#[from] ParseIntError), 627 } 628 629 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> { 630 let (prefix, id) = subject 631 .rsplit_once(" ") 632 .ok_or(BounceSubjectErr::MissingParts)?; 633 if !prefix.starts_with("bounce") { 634 return Err(BounceSubjectErr::NotBounce); 635 } 636 let id: u32 = id.parse()?; 637 Ok(id) 638 }