worker.rs (21212B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::time::Duration; 18 19 use failure_injection::{InjectedErr, fail_point}; 20 use http_client::ApiErr; 21 use jiff::Timestamp; 22 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; 23 use taler_api::subject::{self, parse_incoming_unstructured}; 24 use taler_common::{ 25 ExpoBackoffDecorr, 26 config::Config, 27 types::amount::{self, Currency}, 28 }; 29 use tokio::{join, sync::Notify}; 30 use tracing::{debug, error, info, trace, warn}; 31 32 use crate::{ 33 config::{AccountType, WorkerCfg}, 34 constants::SYNC_CURSOR_KEY, 35 cyclos_api::{ 36 api::{CyclosAuth, CyclosErr}, 37 client::Client, 38 types::{AccountKind, HistoryItem, InputError, NotFoundError, OrderBy}, 39 }, 40 db::{ 41 self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind, 42 kv_get, kv_set, 43 }, 44 notification::watch_notification, 45 }; 46 47 #[derive(Debug, thiserror::Error)] 48 pub enum WorkerError { 49 #[error(transparent)] 50 Db(#[from] sqlx::Error), 51 #[error(transparent)] 52 Api(#[from] ApiErr<CyclosErr>), 53 #[error("Another worker is running concurrently")] 54 Concurrency, 55 #[error(transparent)] 56 Injected(#[from] InjectedErr), 57 } 58 59 pub type WorkerResult = Result<(), WorkerError>; 60 61 pub async fn run_worker( 62 cfg: &Config, 63 pool: &PgPool, 64 client: &http_client::Client, 65 transient: bool, 66 ) -> anyhow::Result<()> { 67 let cfg = WorkerCfg::parse(cfg)?; 68 let client = Client { 69 client, 70 api_url: &cfg.host.api_url, 71 auth: &CyclosAuth::Basic { 72 username: cfg.host.username, 73 password: cfg.host.password, 74 }, 75 }; 76 if transient { 77 let mut conn = pool.acquire().await?; 78 Worker { 79 client: &client, 80 db: &mut conn, 81 account_type_id: *cfg.account_type_id, 82 payment_type_id: *cfg.payment_type_id, 83 account_type: cfg.account_type, 84 currency: cfg.currency.clone(), 85 } 86 .run() 87 .await?; 88 return Ok(()); 89 } 90 91 let notification = Notify::new(); 92 93 let watcher = async { 94 watch_notification(&client, ¬ification).await; 95 }; 96 let worker = async { 97 let mut jitter = ExpoBackoffDecorr::default(); 98 let mut skip_notifications: bool = true; 99 loop { 100 info!(target: "worker", "running at initialisation"); 101 let res: WorkerResult = async { 102 let db = &mut PgListener::connect_with(pool).await?; 103 104 // Listen to all channels 105 db.listen_all(["transfer"]).await?; 106 107 loop { 108 if !skip_notifications { 109 tokio::select! { 110 _ = tokio::time::sleep(cfg.frequency) => { 111 info!(target: "worker", "running at frequency"); 112 } 113 res = db.try_recv() => { 114 let mut ntf = res?; 115 // Conflate all notifications 116 while let Some(n) = ntf { 117 debug!(target: "worker", "notification from {}", n.channel()); 118 ntf = db.next_buffered(); 119 } 120 info!(target: "worker", "running at db trigger"); 121 } 122 _ = notification.notified() => { 123 info!(target: "worker", "running at notification trigger"); 124 } 125 }; 126 } 127 skip_notifications = false; 128 Worker { 129 client: &client, 130 db: db.acquire().await?, 131 account_type_id: *cfg.account_type_id, 132 payment_type_id: *cfg.payment_type_id, 133 account_type: cfg.account_type, 134 currency: cfg.currency.clone(), 135 } 136 .run() 137 .await?; 138 jitter.reset(); 139 } 140 } 141 .await; 142 let err = res.unwrap_err(); 143 error!(target: "worker", "{err}"); 144 145 match err { 146 WorkerError::Concurrency => { 147 // This error won't resolve by itself easily and it mean we are actually making progress 148 // in another worker so we can jitter more aggressively 149 tokio::time::sleep(Duration::from_secs(15)).await; 150 skip_notifications = false; 151 } 152 WorkerError::Api(ApiErr { 153 ctx: _, 154 err: CyclosErr::Input(InputError::Validation { .. }), 155 }) => { 156 // In case of validation failure we do not want to retry right away as it can DOS the service 157 skip_notifications = false; 158 } 159 WorkerError::Api(_) | WorkerError::Db(_) | WorkerError::Injected(_) => { 160 skip_notifications = true; 161 } 162 } 163 tokio::time::sleep(jitter.backoff()).await; 164 } 165 }; 166 join!(watcher, worker); // TODO try_join 167 Ok(()) 168 } 169 170 pub struct Worker<'a> { 171 pub client: &'a Client<'a>, 172 pub db: &'a mut PgConnection, 173 pub currency: Currency, 174 pub account_type_id: i64, 175 pub payment_type_id: i64, 176 pub account_type: AccountType, 177 } 178 179 impl Worker<'_> { 180 /// Run a single worker pass 181 pub async fn run(&mut self) -> WorkerResult { 182 // Some worker operations are not idempotent, therefore it's not safe to have multiple worker 183 // running concurrently. We use a global Postgres advisory lock to prevent it. 184 if !db::worker_lock(self.db).await? { 185 return Err(WorkerError::Concurrency); 186 }; 187 188 // Sync transactions 189 let mut cursor: Timestamp = kv_get(&mut *self.db, SYNC_CURSOR_KEY) 190 .await? 191 .unwrap_or_default(); 192 193 loop { 194 let page = self 195 .client 196 .history(self.account_type_id, OrderBy::DateAsc, 0, Some(cursor)) 197 .await?; 198 for transfer in page.page { 199 if transfer.date > cursor { 200 cursor = transfer.date; 201 } 202 let tx = extract_tx_info(transfer); 203 match tx { 204 Tx::In(tx_in) => self.ingest_in(tx_in).await?, 205 Tx::Out(tx_out) => self.ingest_out(tx_out).await?, 206 } 207 } 208 209 kv_set(&mut *self.db, SYNC_CURSOR_KEY, &cursor).await?; 210 211 if !page.has_next_page { 212 break; 213 } 214 } 215 216 // Send transactions 217 let start = Timestamp::now(); 218 loop { 219 let batch = db::pending_batch(&mut *self.db, &start).await?; 220 if batch.is_empty() { 221 break; 222 } 223 for initiated in batch { 224 debug!(target: "worker", "send tx {initiated}"); 225 let res = self 226 .client 227 .direct_payment( 228 initiated.creditor_id, 229 self.payment_type_id, 230 initiated.amount, 231 &initiated.subject, 232 ) 233 .await; 234 fail_point("direct-payment")?; 235 match res { 236 Ok(tx) => { 237 // Update transaction status, on failure the initiated transaction will be orphan 238 db::initiated_submit_success( 239 &mut *self.db, 240 initiated.id, 241 &tx.date, 242 tx.id.0, 243 ) 244 .await?; 245 trace!(target: "worker", "init tx {}", tx.id); 246 } 247 Err(e) => { 248 let msg = match e.err { 249 CyclosErr::Unknown(NotFoundError { entity_type, key }) => { 250 format!("unknown {entity_type} {key}") 251 } 252 CyclosErr::Forbidden(err) => err.to_string(), 253 _ => return Err(e.into()), 254 }; 255 // TODO is permission should be considered are hard or soft failure ? 256 db::initiated_submit_permanent_failure(&mut *self.db, initiated.id, &msg) 257 .await?; 258 error!(target: "worker", "initiated failure {initiated}: {msg}"); 259 } 260 } 261 } 262 } 263 Ok(()) 264 } 265 266 /// Ingest an incoming transaction 267 async fn ingest_in(&mut self, tx: TxIn) -> WorkerResult { 268 match self.account_type { 269 AccountType::Exchange => { 270 let transfer = self.client.transfer(tx.transfer_id).await?; 271 let bounce = async |db: &mut PgConnection, 272 reason: &str| 273 -> Result<(), WorkerError> { 274 // Fetch existing transaction 275 if let Some(chargeback) = transfer.charged_back_by { 276 let res = db::register_bounced_tx_in( 277 db, 278 &tx, 279 *chargeback.id, 280 reason, 281 &Timestamp::now(), 282 ) 283 .await?; 284 if res.tx_new { 285 info!(target: "worker", 286 "in {tx} bounced (recovered) in {}: {reason}", chargeback.id 287 ); 288 } else { 289 trace!(target: "worker", 290 "in {tx} already seen and bounced in {}: {reason}",chargeback.id 291 ); 292 } 293 } else if !transfer.can_chargeback { 294 match db::register_tx_in(db, &tx, &None, &Timestamp::now()).await? { 295 AddIncomingResult::Success { new, .. } => { 296 if new { 297 warn!(target: "worker", "in {tx} cannot bounce: {reason}"); 298 } else { 299 trace!(target: "worker", "in {tx} already seen and cannot bounce "); 300 } 301 } 302 AddIncomingResult::ReservePubReuse 303 | AddIncomingResult::UnknownMapping 304 | AddIncomingResult::MappingReuse => unreachable!(), 305 } 306 } else { 307 let chargeback_id = self.client.chargeback(*transfer.id).await?; 308 fail_point("chargeback")?; 309 let res = db::register_bounced_tx_in( 310 db, 311 &tx, 312 chargeback_id, 313 reason, 314 &Timestamp::now(), 315 ) 316 .await?; 317 if res.tx_new { 318 info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}"); 319 } else { 320 trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}"); 321 } 322 } 323 Ok(()) 324 }; 325 if let Some(chargeback) = transfer.chargeback_of { 326 // This a chargeback of one of our transaction, if we bounce we might enter a loop 327 match db::initiated_chargeback_failure(&mut *self.db, *chargeback.id).await? { 328 ChargebackFailureResult::Unknown => { 329 trace!(target: "worker", "initiated failure unknown: charged back") 330 } 331 ChargebackFailureResult::Known(initiated) => { 332 error!(target: "worker", "initiated failure {initiated}: charged back") 333 } 334 ChargebackFailureResult::Idempotent(initiated) => { 335 trace!(target: "worker", "initiated failure {initiated} already seen: charged back") 336 } 337 } 338 // Sill register the incoming transaction as an incoming one 339 match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? { 340 AddIncomingResult::Success { new, .. } => { 341 if new { 342 info!(target: "worker", "in {tx} chargeback"); 343 } else { 344 trace!(target: "worker", "in {tx} chargeback already seen"); 345 } 346 } 347 AddIncomingResult::ReservePubReuse 348 | AddIncomingResult::UnknownMapping 349 | AddIncomingResult::MappingReuse => unreachable!(), 350 } 351 352 return Ok(()); 353 } 354 match parse_incoming_unstructured(&tx.subject) { 355 Ok(None) => bounce(self.db, "missing public key").await?, 356 Ok(Some(subject)) => { 357 match db::register_tx_in(self.db, &tx, &Some(subject), &Timestamp::now()) 358 .await? 359 { 360 AddIncomingResult::Success { new, .. } => { 361 if new { 362 info!(target: "worker", "in {tx}"); 363 } else { 364 trace!(target: "worker", "in {tx} already seen"); 365 } 366 } 367 AddIncomingResult::ReservePubReuse => { 368 bounce(self.db, "reserve pub reuse").await? 369 } 370 AddIncomingResult::UnknownMapping => { 371 bounce(self.db, "unknown mapping").await? 372 } 373 AddIncomingResult::MappingReuse => { 374 bounce(self.db, "mapping reuse").await? 375 } 376 } 377 } 378 Err(e) => bounce(self.db, &e.to_string()).await?, 379 } 380 } 381 AccountType::Normal => { 382 match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? { 383 AddIncomingResult::Success { new, .. } => { 384 if new { 385 info!(target: "worker", "in {tx}"); 386 } else { 387 trace!(target: "worker", "in {tx} already seen"); 388 } 389 } 390 AddIncomingResult::ReservePubReuse 391 | AddIncomingResult::UnknownMapping 392 | AddIncomingResult::MappingReuse => unreachable!(), 393 } 394 } 395 } 396 Ok(()) 397 } 398 399 async fn ingest_out(&mut self, tx: TxOut) -> WorkerResult { 400 match self.account_type { 401 AccountType::Exchange => { 402 let transfer = self.client.transfer(tx.transfer_id).await?; 403 404 if transfer.charged_back_by.is_some() { 405 match db::initiated_chargeback_failure(&mut *self.db, *transfer.id).await? { 406 ChargebackFailureResult::Unknown => { 407 trace!(target: "worker", "initiated failure unknown: charged back") 408 } 409 ChargebackFailureResult::Known(initiated) => { 410 error!(target: "worker", "initiated failure {initiated}: charged back") 411 } 412 ChargebackFailureResult::Idempotent(initiated) => { 413 trace!(target: "worker", "initiated failure {initiated} already seen: charged back") 414 } 415 } 416 } 417 418 let kind = if let Ok(subject) = subject::parse_outgoing(&tx.subject) { 419 TxOutKind::Talerable(subject) 420 } else if let Some(chargeback) = &transfer.chargeback_of { 421 TxOutKind::Bounce(*chargeback.id) 422 } else { 423 TxOutKind::Simple 424 }; 425 426 let res = db::register_tx_out(self.db, &tx, &kind, &Timestamp::now()).await?; 427 match res.result { 428 RegisterResult::idempotent => match kind { 429 TxOutKind::Simple => { 430 trace!(target: "worker", "out malformed {tx} already seen") 431 } 432 TxOutKind::Bounce(_) => { 433 trace!(target: "worker", "out bounce {tx} already seen") 434 } 435 TxOutKind::Talerable(_) => { 436 trace!(target: "worker", "out {tx} already seen") 437 } 438 }, 439 RegisterResult::known => match kind { 440 TxOutKind::Simple => { 441 warn!(target: "worker", "out malformed {tx}") 442 } 443 TxOutKind::Bounce(_) => { 444 info!(target: "worker", "out bounce {tx}") 445 } 446 TxOutKind::Talerable(_) => { 447 info!(target: "worker", "out {tx}") 448 } 449 }, 450 RegisterResult::recovered => match kind { 451 TxOutKind::Simple => { 452 warn!(target: "worker", "out malformed (recovered) {tx}") 453 } 454 TxOutKind::Bounce(_) => { 455 warn!(target: "worker", "out bounce (recovered) {tx}") 456 } 457 TxOutKind::Talerable(_) => { 458 warn!(target: "worker", "out (recovered) {tx}") 459 } 460 }, 461 } 462 } 463 AccountType::Normal => { 464 let res = db::register_tx_out(self.db, &tx, &TxOutKind::Simple, &Timestamp::now()) 465 .await?; 466 match res.result { 467 RegisterResult::idempotent => { 468 trace!(target: "worker", "out {tx} already seen"); 469 } 470 RegisterResult::known => { 471 info!(target: "worker", "out {tx}"); 472 } 473 RegisterResult::recovered => { 474 warn!(target: "worker", "out (recovered) {tx}"); 475 } 476 } 477 } 478 } 479 Ok(()) 480 } 481 } 482 483 pub enum Tx { 484 In(TxIn), 485 Out(TxOut), 486 } 487 488 pub fn extract_tx_info(tx: HistoryItem) -> Tx { 489 let amount = amount::decimal(tx.amount.trim_start_matches('-')); 490 let (id, name) = match tx.related_account.kind { 491 AccountKind::System => (tx.related_account.ty.id, tx.related_account.ty.name), 492 AccountKind::User { user } => (user.id, user.display), 493 }; 494 if tx.amount.starts_with("-") { 495 Tx::Out(TxOut { 496 transfer_id: *tx.id, 497 tx_id: tx.transaction.map(|it| *it.id), 498 amount, 499 subject: tx.description.unwrap_or_default(), 500 creditor_id: *id, 501 creditor_name: name, 502 valued_at: tx.date, 503 }) 504 } else { 505 Tx::In(TxIn { 506 transfer_id: *tx.id, 507 tx_id: tx.transaction.map(|it| *it.id), 508 amount, 509 subject: tx.description.unwrap_or_default(), 510 debtor_id: *id, 511 debtor_name: name, 512 valued_at: tx.date, 513 }) 514 } 515 }