taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (27187B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{num::ParseIntError, time::Duration};
     18 
     19 use aws_lc_rs::signature::EcdsaKeyPair;
     20 use failure_injection::{InjectedErr, fail_point};
     21 use http_client::ApiErr;
     22 use jiff::{Timestamp, Zoned, civil::Date};
     23 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener};
     24 use taler_api::subject::{self, parse_incoming_unstructured};
     25 use taler_common::{
     26     ExpoBackoffDecorr,
     27     config::Config,
     28     types::{
     29         amount::{self},
     30         iban::IBAN,
     31     },
     32 };
     33 use tracing::{debug, error, info, trace, warn};
     34 
     35 use crate::{
     36     FullHuPayto, HuIban,
     37     config::{AccountType, WorkerCfg},
     38     db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind},
     39     magnet_api::{
     40         api::MagnetErr,
     41         client::{ApiClient, AuthClient},
     42         types::{Direction, Next, Order, TxDto, TxStatus},
     43     },
     44     setup,
     45 };
     46 
     47 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken
     48 
     49 #[derive(Debug, thiserror::Error)]
     50 pub enum WorkerError {
     51     #[error(transparent)]
     52     Db(#[from] sqlx::Error),
     53     #[error(transparent)]
     54     Api(#[from] ApiErr<MagnetErr>),
     55     #[error("Another worker is running concurrently")]
     56     Concurrency,
     57     #[error(transparent)]
     58     Injected(#[from] InjectedErr),
     59 }
     60 
     61 pub type WorkerResult = Result<(), WorkerError>;
     62 
     63 pub async fn run_worker(
     64     cfg: &Config,
     65     pool: &PgPool,
     66     client: &http_client::Client,
     67     transient: bool,
     68 ) -> anyhow::Result<()> {
     69     let cfg = WorkerCfg::parse(cfg)?;
     70     let keys = setup::load(&cfg)?;
     71     let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
     72 
     73     if transient {
     74         let mut conn = pool.acquire().await?;
     75         let account = client.account(cfg.payto.bban()).await?;
     76         Worker {
     77             client: &client,
     78             db: &mut conn,
     79             account_number: &account.number,
     80             account_code: account.code,
     81             key: &keys.signing_key,
     82             account_type: cfg.account_type,
     83             ignore_tx_before: cfg.ignore_tx_before,
     84             ignore_bounces_before: cfg.ignore_bounces_before,
     85         }
     86         .run()
     87         .await?;
     88         return Ok(());
     89     }
     90 
     91     let mut jitter = ExpoBackoffDecorr::default();
     92 
     93     loop {
     94         let res: WorkerResult = async {
     95             let account = client.account(cfg.payto.bban()).await?;
     96             let db = &mut PgListener::connect_with(pool).await?;
     97 
     98             // Listen to all channels
     99             db.listen_all(["transfer"]).await?;
    100 
    101             info!(target: "worker", "running at initialisation");
    102 
    103             loop {
    104                 debug!(target: "worker", "running");
    105                 Worker {
    106                     client: &client,
    107                     db: db.acquire().await?,
    108                     account_number: &account.number,
    109                     account_code: account.code,
    110                     key: &keys.signing_key,
    111                     account_type: cfg.account_type,
    112                     ignore_tx_before: cfg.ignore_tx_before,
    113                     ignore_bounces_before: cfg.ignore_bounces_before,
    114                 }
    115                 .run()
    116                 .await?;
    117                 jitter.reset();
    118 
    119                 // Wait for notifications or sync timeout
    120                 if let Ok(res) = tokio::time::timeout(cfg.frequency, db.try_recv()).await {
    121                     let mut ntf = res?;
    122                     // Conflate all notifications
    123                     while let Some(n) = ntf {
    124                         debug!(target: "worker", "notification from {}", n.channel());
    125                         ntf = db.next_buffered();
    126                     }
    127 
    128                     if ntf.is_some() {
    129                         info!(target: "worker", "running at db trigger");
    130                     } else {
    131                         info!(target: "worker", "running at frequency");
    132                     }
    133                 }
    134             }
    135         }
    136         .await;
    137         let err = res.unwrap_err();
    138         error!(target: "worker", "{err}");
    139 
    140         if matches!(err, WorkerError::Concurrency) {
    141             // This error won't resolve by itself easily and it mean we are actually making progress
    142             // in another worker so we can jitter more aggressively
    143             tokio::time::sleep(Duration::from_secs(15)).await;
    144         }
    145         tokio::time::sleep(jitter.backoff()).await;
    146     }
    147 }
    148 
    149 pub struct Worker<'a> {
    150     pub client: &'a ApiClient<'a>,
    151     pub db: &'a mut PgConnection,
    152     pub account_number: &'a str,
    153     pub account_code: u64,
    154     pub key: &'a EcdsaKeyPair,
    155     pub account_type: AccountType,
    156     pub ignore_tx_before: Option<Date>,
    157     pub ignore_bounces_before: Option<Date>,
    158 }
    159 
    160 impl Worker<'_> {
    161     /// Run a single worker pass
    162     pub async fn run(&mut self) -> WorkerResult {
    163         // Some worker operations are not idempotent, therefore it's not safe to have multiple worker
    164         // running concurrently. We use a global Postgres advisory lock to prevent it.
    165         if !db::worker_lock(self.db).await? {
    166             return Err(WorkerError::Concurrency);
    167         };
    168 
    169         // Sync transactions
    170         let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
    171         let mut all_final = true;
    172         let mut first = true;
    173         loop {
    174             let page = self
    175                 .client
    176                 .page_tx(
    177                     Direction::Both,
    178                     Order::Ascending,
    179                     100,
    180                     self.account_number,
    181                     &next,
    182                     first,
    183                 )
    184                 .await?;
    185             first = false;
    186             next = page.next;
    187             for item in page.list {
    188                 all_final &= item.tx.status.is_final();
    189                 let tx = extract_tx_info(item.tx);
    190                 match tx {
    191                     Tx::In(tx_in) => {
    192                         // We only register final successful incoming transactions
    193                         if tx_in.status != TxStatus::Completed {
    194                             debug!(target: "worker", "pending or failed in {tx_in}");
    195                             continue;
    196                         }
    197 
    198                         if let Some(before) = self.ignore_tx_before
    199                             && tx_in.value_date < before
    200                         {
    201                             debug!(target: "worker", "ignore in {tx_in}");
    202                             continue;
    203                         }
    204                         let bounce = async |db: &mut PgConnection,
    205                                             reason: &str|
    206                                -> Result<(), WorkerError> {
    207                             if let Some(before) = self.ignore_bounces_before
    208                                 && tx_in.value_date < before
    209                             {
    210                                 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now())
    211                                     .await?
    212                                 {
    213                                     AddIncomingResult::Success { new, .. } => {
    214                                         if new {
    215                                             info!(target: "worker", "in  {tx_in} skip bounce: {reason}");
    216                                         } else {
    217                                             trace!(target: "worker", "in  {tx_in} already skip bounce ");
    218                                         }
    219                                     }
    220                                     AddIncomingResult::ReservePubReuse
    221                                     | AddIncomingResult::UnknownMapping
    222                                     | AddIncomingResult::MappingReuse => unreachable!(),
    223                                 }
    224                             } else {
    225                                 let res = db::register_bounce_tx_in(
    226                                     db,
    227                                     &tx_in,
    228                                     reason,
    229                                     &Timestamp::now(),
    230                                 )
    231                                 .await?;
    232 
    233                                 if res.tx_new {
    234                                     info!(target: "worker",
    235                                         "in  {tx_in} bounced in {}: {reason}",
    236                                         res.bounce_id
    237                                     );
    238                                 } else {
    239                                     trace!(target: "worker",
    240                                         "in  {tx_in} already seen and bounced in {}: {reason}",
    241                                         res.bounce_id
    242                                     );
    243                                 }
    244                             }
    245                             Ok(())
    246                         };
    247                         match self.account_type {
    248                             AccountType::Exchange => {
    249                                 match parse_incoming_unstructured(&tx_in.subject) {
    250                                     Ok(None) => bounce(self.db, "missing public key").await?,
    251                                     Ok(Some(subject)) => match db::register_tx_in(
    252                                         self.db,
    253                                         &tx_in,
    254                                         &Some(subject),
    255                                         &Timestamp::now(),
    256                                     )
    257                                     .await?
    258                                     {
    259                                         AddIncomingResult::Success { new, .. } => {
    260                                             if new {
    261                                                 info!(target: "worker", "in  {tx_in}");
    262                                             } else {
    263                                                 trace!(target: "worker", "in  {tx_in} already seen");
    264                                             }
    265                                         }
    266                                         AddIncomingResult::ReservePubReuse => {
    267                                             bounce(self.db, "reserve pub reuse").await?
    268                                         }
    269                                         AddIncomingResult::UnknownMapping => {
    270                                             bounce(self.db, "unknown mapping").await?
    271                                         }
    272                                         AddIncomingResult::MappingReuse => {
    273                                             bounce(self.db, "mapping reuse").await?
    274                                         }
    275                                     },
    276                                     Err(e) => bounce(self.db, &e.to_string()).await?,
    277                                 }
    278                             }
    279                             AccountType::Normal => {
    280                                 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now())
    281                                     .await?
    282                                 {
    283                                     AddIncomingResult::Success { new, .. } => {
    284                                         if new {
    285                                             info!(target: "worker", "in  {tx_in}");
    286                                         } else {
    287                                             trace!(target: "worker", "in  {tx_in} already seen");
    288                                         }
    289                                     }
    290                                     AddIncomingResult::ReservePubReuse
    291                                     | AddIncomingResult::UnknownMapping
    292                                     | AddIncomingResult::MappingReuse => unreachable!(),
    293                                 }
    294                             }
    295                         }
    296                     }
    297                     Tx::Out(tx_out) => {
    298                         match tx_out.status {
    299                             TxStatus::ToBeRecorded => {
    300                                 self.recover_tx(&tx_out).await?;
    301                                 continue;
    302                             }
    303                             TxStatus::PendingFirstSignature
    304                             | TxStatus::PendingSecondSignature
    305                             | TxStatus::PendingProcessing
    306                             | TxStatus::Verified
    307                             | TxStatus::PartiallyCompleted
    308                             | TxStatus::UnderReview => {
    309                                 // Still pending
    310                                 debug!(target: "worker", "pending out {tx_out}");
    311                                 continue;
    312                             }
    313                             TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {}
    314                         }
    315                         match self.account_type {
    316                             AccountType::Exchange => {
    317                                 let kind = if let Ok(subject) =
    318                                     subject::parse_outgoing(&tx_out.subject)
    319                                 {
    320                                     TxOutKind::Talerable(subject)
    321                                 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) {
    322                                     TxOutKind::Bounce(bounced)
    323                                 } else {
    324                                     TxOutKind::Simple
    325                                 };
    326                                 if tx_out.status == TxStatus::Completed {
    327                                     let res = db::register_tx_out(
    328                                         self.db,
    329                                         &tx_out,
    330                                         &kind,
    331                                         &Timestamp::now(),
    332                                     )
    333                                     .await?;
    334                                     match res.result {
    335                                         RegisterResult::idempotent => match kind {
    336                                             TxOutKind::Simple => {
    337                                                 trace!(target: "worker", "out malformed {tx_out} already seen")
    338                                             }
    339                                             TxOutKind::Bounce(_) => {
    340                                                 trace!(target: "worker", "out bounce {tx_out} already seen")
    341                                             }
    342                                             TxOutKind::Talerable(_) => {
    343                                                 trace!(target: "worker", "out {tx_out} already seen")
    344                                             }
    345                                         },
    346                                         RegisterResult::known => match kind {
    347                                             TxOutKind::Simple => {
    348                                                 warn!(target: "worker", "out malformed {tx_out}")
    349                                             }
    350                                             TxOutKind::Bounce(_) => {
    351                                                 info!(target: "worker", "out bounce {tx_out}")
    352                                             }
    353                                             TxOutKind::Talerable(_) => {
    354                                                 info!(target: "worker", "out {tx_out}")
    355                                             }
    356                                         },
    357                                         RegisterResult::recovered => match kind {
    358                                             TxOutKind::Simple => {
    359                                                 warn!(target: "worker", "out malformed (recovered) {tx_out}")
    360                                             }
    361                                             TxOutKind::Bounce(_) => {
    362                                                 warn!(target: "worker", "out bounce (recovered) {tx_out}")
    363                                             }
    364                                             TxOutKind::Talerable(_) => {
    365                                                 warn!(target: "worker", "out (recovered) {tx_out}")
    366                                             }
    367                                         },
    368                                     }
    369                                 } else {
    370                                     let bounced = match kind {
    371                                         TxOutKind::Simple => None,
    372                                         TxOutKind::Bounce(bounced) => Some(bounced),
    373                                         TxOutKind::Talerable(_) => None,
    374                                     };
    375                                     let res = db::register_tx_out_failure(
    376                                         self.db,
    377                                         tx_out.code,
    378                                         bounced,
    379                                         &Timestamp::now(),
    380                                     )
    381                                     .await?;
    382                                     if let Some(id) = res.initiated_id {
    383                                         if res.new {
    384                                             error!(target: "worker", "out failure {id} {tx_out}");
    385                                         } else {
    386                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    387                                         }
    388                                     }
    389                                 }
    390                             }
    391                             AccountType::Normal => {
    392                                 if tx_out.status == TxStatus::Completed {
    393                                     let res = db::register_tx_out(
    394                                         self.db,
    395                                         &tx_out,
    396                                         &TxOutKind::Simple,
    397                                         &Timestamp::now(),
    398                                     )
    399                                     .await?;
    400                                     match res.result {
    401                                         RegisterResult::idempotent => {
    402                                             trace!(target: "worker", "out {tx_out} already seen");
    403                                         }
    404                                         RegisterResult::known => {
    405                                             info!(target: "worker", "out {tx_out}");
    406                                         }
    407                                         RegisterResult::recovered => {
    408                                             warn!(target: "worker", "out (recovered) {tx_out}");
    409                                         }
    410                                     }
    411                                 } else {
    412                                     let res = db::register_tx_out_failure(
    413                                         self.db,
    414                                         tx_out.code,
    415                                         None,
    416                                         &Timestamp::now(),
    417                                     )
    418                                     .await?;
    419                                     if let Some(id) = res.initiated_id {
    420                                         if res.new {
    421                                             error!(target: "worker", "out failure {id} {tx_out}");
    422                                         } else {
    423                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    424                                         }
    425                                     }
    426                                 }
    427                             }
    428                         }
    429                     }
    430                 }
    431             }
    432 
    433             if let Some(_next) = &next {
    434                 // Update in db cursor only if all previous transactions where final
    435                 if all_final {
    436                     // debug!(target: "worker", "advance cursor {next:?}");
    437                     // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken
    438                 }
    439             } else {
    440                 break;
    441             }
    442         }
    443 
    444         // Send transactions
    445         let start = Timestamp::now();
    446         let now = Zoned::now();
    447         loop {
    448             let batch = db::pending_batch(&mut *self.db, &start).await?;
    449             if batch.is_empty() {
    450                 break;
    451             }
    452             for tx in batch {
    453                 debug!(target: "worker", "send tx {tx}");
    454                 self.init_tx(&tx, &now).await?;
    455             }
    456         }
    457         Ok(())
    458     }
    459 
    460     /// Try to sign an unsigned initiated transaction
    461     pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult {
    462         if db::initiated_exists_for_code(&mut *self.db, tx.code)
    463             .await?
    464             .is_some()
    465         {
    466             // Known initiated we submit it
    467             assert_eq!(tx.amount.frac, 0);
    468             self.submit_tx(
    469                 tx.code,
    470                 -(tx.amount.val as f64),
    471                 &tx.value_date,
    472                 tx.creditor.bban(),
    473             )
    474             .await?;
    475         } else {
    476             // The transaction is unknown (we failed after creating it and before storing it in the db)
    477             // we delete it
    478             self.client.delete_tx(tx.code).await?;
    479             debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code);
    480         }
    481 
    482         Ok(())
    483     }
    484 
    485     /// Create and sign a forint transfer
    486     pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult {
    487         trace!(target: "worker", "init tx {tx}");
    488         assert_eq!(tx.amount.frac, 0);
    489         let date = now.date();
    490         // Initialize the new transaction, on failure an orphan initiated transaction can be created
    491         let res = self
    492             .client
    493             .init_tx(
    494                 self.account_code,
    495                 tx.amount.val as f64,
    496                 &tx.subject,
    497                 &date,
    498                 &tx.creditor.name,
    499                 tx.creditor.bban(),
    500             )
    501             .await;
    502         fail_point("init-tx")?;
    503         let info = match res {
    504             // Check if succeeded
    505             Ok(info) => {
    506                 // Update transaction status, on failure the initiated transaction will be orphan
    507                 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code)
    508                     .await?;
    509                 info
    510             }
    511             Err(e) => {
    512                 if let MagnetErr::Magnet(e) = &e.err {
    513                     // Check if error is permanent
    514                     if matches!(
    515                         (e.error_code, e.short_message.as_str()),
    516                         (404, "BSZLA_NEM_TALALHATO") // Unknown account
    517                          | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account
    518                     ) {
    519                         db::initiated_submit_permanent_failure(
    520                             &mut *self.db,
    521                             tx.id,
    522                             &Timestamp::now(),
    523                             &e.to_string(),
    524                         )
    525                         .await?;
    526                         error!(target: "worker", "initiated failure {tx}: {e}");
    527                         return WorkerResult::Ok(());
    528                     }
    529                 }
    530                 return Err(e.into());
    531             }
    532         };
    533         trace!(target: "worker", "init tx {}", info.code);
    534 
    535         // Sign transaction
    536         self.submit_tx(info.code, info.amount, &date, tx.creditor.bban())
    537             .await?;
    538         Ok(())
    539     }
    540 
    541     /** Submit an initiated forint transfer */
    542     pub async fn submit_tx(
    543         &mut self,
    544         tx_code: u64,
    545         amount: f64,
    546         date: &Date,
    547         creditor: &str,
    548     ) -> WorkerResult {
    549         debug!(target: "worker", "submit tx {tx_code}");
    550         fail_point("submit-tx")?;
    551         // Submit an initiated transaction, on failure we will retry
    552         match self
    553             .client
    554             .submit_tx(
    555                 self.key,
    556                 self.account_number,
    557                 tx_code,
    558                 amount,
    559                 date,
    560                 creditor,
    561             )
    562             .await
    563         {
    564             Ok(_) => Ok(()),
    565             Err(e) => {
    566                 if let MagnetErr::Magnet(e) = &e.err {
    567                     // Check if soft failure
    568                     if matches!(
    569                         (e.error_code, e.short_message.as_str()),
    570                         (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed
    571                     ) {
    572                         warn!(target: "worker", "submit tx {tx_code}: {e}");
    573                         return Ok(());
    574                     }
    575                 }
    576                 Err(e.into())
    577             }
    578         }
    579     }
    580 }
    581 
    582 pub enum Tx {
    583     In(TxIn),
    584     Out(TxOut),
    585 }
    586 
    587 pub fn extract_tx_info(tx: TxDto) -> Tx {
    588     // TODO amount from f64 without allocations
    589     let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs()));
    590     // TODO we should support non hungarian account and error handling
    591     let iban = if tx.counter_account.starts_with("HU") {
    592         let iban: IBAN = tx.counter_account.parse().unwrap();
    593         HuIban::try_from(iban).unwrap()
    594     } else {
    595         HuIban::from_bban(&tx.counter_account).unwrap()
    596     };
    597     let counter_account = FullHuPayto::new(iban, &tx.counter_name);
    598     if tx.amount.is_sign_positive() {
    599         Tx::In(TxIn {
    600             code: tx.code,
    601             amount,
    602             subject: tx.subject.unwrap_or_default(),
    603             debtor: counter_account,
    604             value_date: tx.value_date,
    605             status: tx.status,
    606         })
    607     } else {
    608         Tx::Out(TxOut {
    609             code: tx.code,
    610             amount,
    611             subject: tx.subject.unwrap_or_default(),
    612             creditor: counter_account,
    613             value_date: tx.value_date,
    614             status: tx.status,
    615         })
    616     }
    617 }
    618 
    619 #[derive(Debug, thiserror::Error)]
    620 pub enum BounceSubjectErr {
    621     #[error("missing parts")]
    622     MissingParts,
    623     #[error("not a bounce")]
    624     NotBounce,
    625     #[error("malformed bounced id: {0}")]
    626     Id(#[from] ParseIntError),
    627 }
    628 
    629 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> {
    630     let (prefix, id) = subject
    631         .rsplit_once(" ")
    632         .ok_or(BounceSubjectErr::MissingParts)?;
    633     if !prefix.starts_with("bounce") {
    634         return Err(BounceSubjectErr::NotBounce);
    635     }
    636     let id: u32 = id.parse()?;
    637     Ok(id)
    638 }