taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (27087B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{num::ParseIntError, time::Duration};
     18 
     19 use aws_lc_rs::signature::EcdsaKeyPair;
     20 use failure_injection::{InjectedErr, fail_point};
     21 use http_client::ApiErr;
     22 use jiff::{Timestamp, Zoned, civil::Date};
     23 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener};
     24 use taler_api::subject::{self, parse_incoming_unstructured};
     25 use taler_common::{
     26     ExpoBackoffDecorr,
     27     config::Config,
     28     types::{
     29         amount::{self},
     30         iban::IBAN,
     31     },
     32 };
     33 use tracing::{debug, error, info, trace, warn};
     34 
     35 use crate::{
     36     FullHuPayto, HuIban,
     37     config::{AccountType, WorkerCfg},
     38     db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind},
     39     magnet_api::{
     40         api::MagnetErr,
     41         client::{ApiClient, AuthClient},
     42         types::{Direction, Next, Order, TxDto, TxStatus},
     43     },
     44     setup,
     45 };
     46 
     47 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken
     48 
     49 #[derive(Debug, thiserror::Error)]
     50 pub enum WorkerError {
     51     #[error(transparent)]
     52     Db(#[from] sqlx::Error),
     53     #[error(transparent)]
     54     Api(#[from] ApiErr<MagnetErr>),
     55     #[error("Another worker is running concurrently")]
     56     Concurrency,
     57     #[error(transparent)]
     58     Injected(#[from] InjectedErr),
     59 }
     60 
     61 pub type WorkerResult = Result<(), WorkerError>;
     62 
     63 pub async fn run_worker(
     64     cfg: &Config,
     65     pool: &PgPool,
     66     client: &http_client::Client,
     67     transient: bool,
     68 ) -> anyhow::Result<()> {
     69     let cfg = WorkerCfg::parse(cfg)?;
     70     let keys = setup::load(&cfg)?;
     71     let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
     72 
     73     if transient {
     74         let mut conn = pool.acquire().await?;
     75         let account = client.account(cfg.payto.bban()).await?;
     76         Worker {
     77             client: &client,
     78             db: &mut conn,
     79             account_number: &account.number,
     80             account_code: account.code,
     81             key: &keys.signing_key,
     82             account_type: cfg.account_type,
     83             ignore_tx_before: cfg.ignore_tx_before,
     84             ignore_bounces_before: cfg.ignore_bounces_before,
     85         }
     86         .run()
     87         .await?;
     88         return Ok(());
     89     }
     90 
     91     let mut jitter = ExpoBackoffDecorr::default();
     92 
     93     loop {
     94         let res: WorkerResult = async {
     95             let account = client.account(cfg.payto.bban()).await?;
     96             let db = &mut PgListener::connect_with(pool).await?;
     97 
     98             // Listen to all channels
     99             db.listen_all(["transfer"]).await?;
    100 
    101             info!(target: "worker", "running at initialisation");
    102 
    103             loop {
    104                 debug!(target: "worker", "running");
    105                 Worker {
    106                     client: &client,
    107                     db: db.acquire().await?,
    108                     account_number: &account.number,
    109                     account_code: account.code,
    110                     key: &keys.signing_key,
    111                     account_type: cfg.account_type,
    112                     ignore_tx_before: cfg.ignore_tx_before,
    113                     ignore_bounces_before: cfg.ignore_bounces_before,
    114                 }
    115                 .run()
    116                 .await?;
    117                 jitter.reset();
    118 
    119                 // Wait for notifications or sync timeout
    120                 if let Ok(res) = tokio::time::timeout(cfg.frequency, db.try_recv()).await {
    121                     let mut ntf = res?;
    122                     // Conflate all notifications
    123                     while let Some(n) = ntf {
    124                         debug!(target: "worker", "notification from {}", n.channel());
    125                         ntf = db.next_buffered();
    126                     }
    127 
    128                     if ntf.is_some() {
    129                         info!(target: "worker", "running at db trigger");
    130                     } else {
    131                         info!(target: "worker", "running at frequency");
    132                     }
    133                 }
    134             }
    135         }
    136         .await;
    137         let err = res.unwrap_err();
    138         error!(target: "worker", "{err}");
    139 
    140         if matches!(err, WorkerError::Concurrency) {
    141             // This error won't resolve by itself easily and it mean we are actually making progress
    142             // in another worker so we can jitter more aggressively
    143             tokio::time::sleep(Duration::from_secs(15)).await;
    144         }
    145         tokio::time::sleep(jitter.backoff()).await;
    146     }
    147 }
    148 
    149 pub struct Worker<'a> {
    150     pub client: &'a ApiClient<'a>,
    151     pub db: &'a mut PgConnection,
    152     pub account_number: &'a str,
    153     pub account_code: u64,
    154     pub key: &'a EcdsaKeyPair,
    155     pub account_type: AccountType,
    156     pub ignore_tx_before: Option<Date>,
    157     pub ignore_bounces_before: Option<Date>,
    158 }
    159 
    160 impl Worker<'_> {
    161     /// Run a single worker pass
    162     pub async fn run(&mut self) -> WorkerResult {
    163         // Some worker operations are not idempotent, therefore it's not safe to have multiple worker
    164         // running concurrently. We use a global Postgres advisory lock to prevent it.
    165         if !db::worker_lock(self.db).await? {
    166             return Err(WorkerError::Concurrency);
    167         };
    168 
    169         // Sync transactions
    170         let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
    171         let mut all_final = true;
    172         let mut first = true;
    173         loop {
    174             let page = self
    175                 .client
    176                 .page_tx(
    177                     Direction::Both,
    178                     Order::Ascending,
    179                     100,
    180                     self.account_number,
    181                     &next,
    182                     first,
    183                 )
    184                 .await?;
    185             first = false;
    186             next = page.next;
    187             for item in page.list {
    188                 all_final &= item.tx.status.is_final();
    189                 let tx = extract_tx_info(item.tx);
    190                 match tx {
    191                     Tx::In(tx_in) => {
    192                         // We only register final successful incoming transactions
    193                         if tx_in.status != TxStatus::Completed {
    194                             debug!(target: "worker", "pending or failed in {tx_in}");
    195                             continue;
    196                         }
    197 
    198                         if let Some(before) = self.ignore_tx_before
    199                             && tx_in.value_date < before
    200                         {
    201                             debug!(target: "worker", "ignore in {tx_in}");
    202                             continue;
    203                         }
    204                         let bounce = async |db: &mut PgConnection,
    205                                             reason: &str|
    206                                -> Result<(), WorkerError> {
    207                             if let Some(before) = self.ignore_bounces_before
    208                                 && tx_in.value_date < before
    209                             {
    210                                 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now())
    211                                     .await?
    212                                 {
    213                                     AddIncomingResult::Success { new, .. } => {
    214                                         if new {
    215                                             info!(target: "worker", "in  {tx_in} skip bounce: {reason}");
    216                                         } else {
    217                                             trace!(target: "worker", "in  {tx_in} already skip bounce ");
    218                                         }
    219                                     }
    220                                     AddIncomingResult::ReservePubReuse
    221                                     | AddIncomingResult::UnknownMapping
    222                                     | AddIncomingResult::MappingReuse => unreachable!(),
    223                                 }
    224                             } else {
    225                                 let res = db::register_bounce_tx_in(
    226                                     db,
    227                                     &tx_in,
    228                                     reason,
    229                                     &Timestamp::now(),
    230                                 )
    231                                 .await?;
    232 
    233                                 if res.tx_new {
    234                                     info!(target: "worker",
    235                                         "in  {tx_in} bounced in {}: {reason}",
    236                                         res.bounce_id
    237                                     );
    238                                 } else {
    239                                     trace!(target: "worker",
    240                                         "in  {tx_in} already seen and bounced in {}: {reason}",
    241                                         res.bounce_id
    242                                     );
    243                                 }
    244                             }
    245                             Ok(())
    246                         };
    247                         match self.account_type {
    248                             AccountType::Exchange => {
    249                                 match parse_incoming_unstructured(&tx_in.subject) {
    250                                     Ok(subject) => match db::register_tx_in(
    251                                         self.db,
    252                                         &tx_in,
    253                                         &Some(subject),
    254                                         &Timestamp::now(),
    255                                     )
    256                                     .await?
    257                                     {
    258                                         AddIncomingResult::Success { new, .. } => {
    259                                             if new {
    260                                                 info!(target: "worker", "in  {tx_in}");
    261                                             } else {
    262                                                 trace!(target: "worker", "in  {tx_in} already seen");
    263                                             }
    264                                         }
    265                                         AddIncomingResult::ReservePubReuse => {
    266                                             bounce(self.db, "reserve pub reuse").await?
    267                                         }
    268                                         AddIncomingResult::UnknownMapping => {
    269                                             bounce(self.db, "unknown mapping").await?
    270                                         }
    271                                         AddIncomingResult::MappingReuse => {
    272                                             bounce(self.db, "mapping reuse").await?
    273                                         }
    274                                     },
    275                                     Err(e) => bounce(self.db, &e.to_string()).await?,
    276                                 }
    277                             }
    278                             AccountType::Normal => {
    279                                 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now())
    280                                     .await?
    281                                 {
    282                                     AddIncomingResult::Success { new, .. } => {
    283                                         if new {
    284                                             info!(target: "worker", "in  {tx_in}");
    285                                         } else {
    286                                             trace!(target: "worker", "in  {tx_in} already seen");
    287                                         }
    288                                     }
    289                                     AddIncomingResult::ReservePubReuse
    290                                     | AddIncomingResult::UnknownMapping
    291                                     | AddIncomingResult::MappingReuse => unreachable!(),
    292                                 }
    293                             }
    294                         }
    295                     }
    296                     Tx::Out(tx_out) => {
    297                         match tx_out.status {
    298                             TxStatus::ToBeRecorded => {
    299                                 self.recover_tx(&tx_out).await?;
    300                                 continue;
    301                             }
    302                             TxStatus::PendingFirstSignature
    303                             | TxStatus::PendingSecondSignature
    304                             | TxStatus::PendingProcessing
    305                             | TxStatus::Verified
    306                             | TxStatus::PartiallyCompleted
    307                             | TxStatus::UnderReview => {
    308                                 // Still pending
    309                                 debug!(target: "worker", "pending out {tx_out}");
    310                                 continue;
    311                             }
    312                             TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {}
    313                         }
    314                         match self.account_type {
    315                             AccountType::Exchange => {
    316                                 let kind = if let Ok(subject) =
    317                                     subject::parse_outgoing(&tx_out.subject)
    318                                 {
    319                                     TxOutKind::Talerable(subject)
    320                                 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) {
    321                                     TxOutKind::Bounce(bounced)
    322                                 } else {
    323                                     TxOutKind::Simple
    324                                 };
    325                                 if tx_out.status == TxStatus::Completed {
    326                                     let res = db::register_tx_out(
    327                                         self.db,
    328                                         &tx_out,
    329                                         &kind,
    330                                         &Timestamp::now(),
    331                                     )
    332                                     .await?;
    333                                     match res.result {
    334                                         RegisterResult::idempotent => match kind {
    335                                             TxOutKind::Simple => {
    336                                                 trace!(target: "worker", "out malformed {tx_out} already seen")
    337                                             }
    338                                             TxOutKind::Bounce(_) => {
    339                                                 trace!(target: "worker", "out bounce {tx_out} already seen")
    340                                             }
    341                                             TxOutKind::Talerable(_) => {
    342                                                 trace!(target: "worker", "out {tx_out} already seen")
    343                                             }
    344                                         },
    345                                         RegisterResult::known => match kind {
    346                                             TxOutKind::Simple => {
    347                                                 warn!(target: "worker", "out malformed {tx_out}")
    348                                             }
    349                                             TxOutKind::Bounce(_) => {
    350                                                 info!(target: "worker", "out bounce {tx_out}")
    351                                             }
    352                                             TxOutKind::Talerable(_) => {
    353                                                 info!(target: "worker", "out {tx_out}")
    354                                             }
    355                                         },
    356                                         RegisterResult::recovered => match kind {
    357                                             TxOutKind::Simple => {
    358                                                 warn!(target: "worker", "out malformed (recovered) {tx_out}")
    359                                             }
    360                                             TxOutKind::Bounce(_) => {
    361                                                 warn!(target: "worker", "out bounce (recovered) {tx_out}")
    362                                             }
    363                                             TxOutKind::Talerable(_) => {
    364                                                 warn!(target: "worker", "out (recovered) {tx_out}")
    365                                             }
    366                                         },
    367                                     }
    368                                 } else {
    369                                     let bounced = match kind {
    370                                         TxOutKind::Simple => None,
    371                                         TxOutKind::Bounce(bounced) => Some(bounced),
    372                                         TxOutKind::Talerable(_) => None,
    373                                     };
    374                                     let res = db::register_tx_out_failure(
    375                                         self.db,
    376                                         tx_out.code,
    377                                         bounced,
    378                                         &Timestamp::now(),
    379                                     )
    380                                     .await?;
    381                                     if let Some(id) = res.initiated_id {
    382                                         if res.new {
    383                                             error!(target: "worker", "out failure {id} {tx_out}");
    384                                         } else {
    385                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    386                                         }
    387                                     }
    388                                 }
    389                             }
    390                             AccountType::Normal => {
    391                                 if tx_out.status == TxStatus::Completed {
    392                                     let res = db::register_tx_out(
    393                                         self.db,
    394                                         &tx_out,
    395                                         &TxOutKind::Simple,
    396                                         &Timestamp::now(),
    397                                     )
    398                                     .await?;
    399                                     match res.result {
    400                                         RegisterResult::idempotent => {
    401                                             trace!(target: "worker", "out {tx_out} already seen");
    402                                         }
    403                                         RegisterResult::known => {
    404                                             info!(target: "worker", "out {tx_out}");
    405                                         }
    406                                         RegisterResult::recovered => {
    407                                             warn!(target: "worker", "out (recovered) {tx_out}");
    408                                         }
    409                                     }
    410                                 } else {
    411                                     let res = db::register_tx_out_failure(
    412                                         self.db,
    413                                         tx_out.code,
    414                                         None,
    415                                         &Timestamp::now(),
    416                                     )
    417                                     .await?;
    418                                     if let Some(id) = res.initiated_id {
    419                                         if res.new {
    420                                             error!(target: "worker", "out failure {id} {tx_out}");
    421                                         } else {
    422                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    423                                         }
    424                                     }
    425                                 }
    426                             }
    427                         }
    428                     }
    429                 }
    430             }
    431 
    432             if let Some(_next) = &next {
    433                 // Update in db cursor only if all previous transactions where final
    434                 if all_final {
    435                     // debug!(target: "worker", "advance cursor {next:?}");
    436                     // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken
    437                 }
    438             } else {
    439                 break;
    440             }
    441         }
    442 
    443         // Send transactions
    444         let start = Timestamp::now();
    445         let now = Zoned::now();
    446         loop {
    447             let batch = db::pending_batch(&mut *self.db, &start).await?;
    448             if batch.is_empty() {
    449                 break;
    450             }
    451             for tx in batch {
    452                 debug!(target: "worker", "send tx {tx}");
    453                 self.init_tx(&tx, &now).await?;
    454             }
    455         }
    456         Ok(())
    457     }
    458 
    459     /// Try to sign an unsigned initiated transaction
    460     pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult {
    461         if db::initiated_exists_for_code(&mut *self.db, tx.code)
    462             .await?
    463             .is_some()
    464         {
    465             // Known initiated we submit it
    466             assert_eq!(tx.amount.frac, 0);
    467             self.submit_tx(
    468                 tx.code,
    469                 -(tx.amount.val as f64),
    470                 &tx.value_date,
    471                 tx.creditor.bban(),
    472             )
    473             .await?;
    474         } else {
    475             // The transaction is unknown (we failed after creating it and before storing it in the db)
    476             // we delete it
    477             self.client.delete_tx(tx.code).await?;
    478             debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code);
    479         }
    480 
    481         Ok(())
    482     }
    483 
    484     /// Create and sign a forint transfer
    485     pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult {
    486         trace!(target: "worker", "init tx {tx}");
    487         assert_eq!(tx.amount.frac, 0);
    488         let date = now.date();
    489         // Initialize the new transaction, on failure an orphan initiated transaction can be created
    490         let res = self
    491             .client
    492             .init_tx(
    493                 self.account_code,
    494                 tx.amount.val as f64,
    495                 &tx.subject,
    496                 &date,
    497                 &tx.creditor.name,
    498                 tx.creditor.bban(),
    499             )
    500             .await;
    501         fail_point("init-tx")?;
    502         let info = match res {
    503             // Check if succeeded
    504             Ok(info) => {
    505                 // Update transaction status, on failure the initiated transaction will be orphan
    506                 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code)
    507                     .await?;
    508                 info
    509             }
    510             Err(e) => {
    511                 if let MagnetErr::Magnet(e) = &e.err {
    512                     // Check if error is permanent
    513                     if matches!(
    514                         (e.error_code, e.short_message.as_str()),
    515                         (404, "BSZLA_NEM_TALALHATO") // Unknown account
    516                          | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account
    517                     ) {
    518                         db::initiated_submit_permanent_failure(
    519                             &mut *self.db,
    520                             tx.id,
    521                             &Timestamp::now(),
    522                             &e.to_string(),
    523                         )
    524                         .await?;
    525                         error!(target: "worker", "initiated failure {tx}: {e}");
    526                         return WorkerResult::Ok(());
    527                     }
    528                 }
    529                 return Err(e.into());
    530             }
    531         };
    532         trace!(target: "worker", "init tx {}", info.code);
    533 
    534         // Sign transaction
    535         self.submit_tx(info.code, info.amount, &date, tx.creditor.bban())
    536             .await?;
    537         Ok(())
    538     }
    539 
    540     /** Submit an initiated forint transfer */
    541     pub async fn submit_tx(
    542         &mut self,
    543         tx_code: u64,
    544         amount: f64,
    545         date: &Date,
    546         creditor: &str,
    547     ) -> WorkerResult {
    548         debug!(target: "worker", "submit tx {tx_code}");
    549         fail_point("submit-tx")?;
    550         // Submit an initiated transaction, on failure we will retry
    551         match self
    552             .client
    553             .submit_tx(
    554                 self.key,
    555                 self.account_number,
    556                 tx_code,
    557                 amount,
    558                 date,
    559                 creditor,
    560             )
    561             .await
    562         {
    563             Ok(_) => Ok(()),
    564             Err(e) => {
    565                 if let MagnetErr::Magnet(e) = &e.err {
    566                     // Check if soft failure
    567                     if matches!(
    568                         (e.error_code, e.short_message.as_str()),
    569                         (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed
    570                     ) {
    571                         warn!(target: "worker", "submit tx {tx_code}: {e}");
    572                         return Ok(());
    573                     }
    574                 }
    575                 Err(e.into())
    576             }
    577         }
    578     }
    579 }
    580 
    581 pub enum Tx {
    582     In(TxIn),
    583     Out(TxOut),
    584 }
    585 
    586 pub fn extract_tx_info(tx: TxDto) -> Tx {
    587     // TODO amount from f64 without allocations
    588     let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs()));
    589     // TODO we should support non hungarian account and error handling
    590     let iban = if tx.counter_account.starts_with("HU") {
    591         let iban: IBAN = tx.counter_account.parse().unwrap();
    592         HuIban::try_from(iban).unwrap()
    593     } else {
    594         HuIban::from_bban(&tx.counter_account).unwrap()
    595     };
    596     let counter_account = FullHuPayto::new(iban, &tx.counter_name);
    597     if tx.amount.is_sign_positive() {
    598         Tx::In(TxIn {
    599             code: tx.code,
    600             amount,
    601             subject: tx.subject.unwrap_or_default(),
    602             debtor: counter_account,
    603             value_date: tx.value_date,
    604             status: tx.status,
    605         })
    606     } else {
    607         Tx::Out(TxOut {
    608             code: tx.code,
    609             amount,
    610             subject: tx.subject.unwrap_or_default(),
    611             creditor: counter_account,
    612             value_date: tx.value_date,
    613             status: tx.status,
    614         })
    615     }
    616 }
    617 
    618 #[derive(Debug, thiserror::Error)]
    619 pub enum BounceSubjectErr {
    620     #[error("missing parts")]
    621     MissingParts,
    622     #[error("not a bounce")]
    623     NotBounce,
    624     #[error("malformed bounced id: {0}")]
    625     Id(#[from] ParseIntError),
    626 }
    627 
    628 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> {
    629     let (prefix, id) = subject
    630         .rsplit_once(" ")
    631         .ok_or(BounceSubjectErr::MissingParts)?;
    632     if !prefix.starts_with("bounce") {
    633         return Err(BounceSubjectErr::NotBounce);
    634     }
    635     let id: u32 = id.parse()?;
    636     Ok(id)
    637 }