taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

api.rs (15794B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use taler_api::{
     19     api::{TalerApi, revenue::Revenue, transfer::WireTransferGateway, wire::WireGateway},
     20     error::{ApiResult, failure, failure_code},
     21     subject::{IncomingSubject, fmt_in_subject},
     22 };
     23 use taler_common::{
     24     api_common::{SafeU64, safe_u64},
     25     api_params::{History, Page},
     26     api_revenue::RevenueIncomingHistory,
     27     api_transfer::{
     28         RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration,
     29     },
     30     api_wire::{
     31         AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse,
     32         IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
     33         TransferState, TransferStatus,
     34     },
     35     db::IncomingType,
     36     error_code::ErrorCode,
     37     types::{payto::PaytoURI, timestamp::TalerTimestamp, utils::date_to_utc_ts},
     38 };
     39 use tokio::sync::watch::Sender;
     40 
     41 use crate::{
     42     FullHuPayto,
     43     constants::CURRENCY,
     44     db::{self, AddIncomingResult, Transfer, TxInAdmin},
     45 };
     46 
     47 pub struct MagnetApi {
     48     pub pool: sqlx::PgPool,
     49     pub payto: PaytoURI,
     50     pub in_channel: Sender<i64>,
     51     pub taler_in_channel: Sender<i64>,
     52     pub out_channel: Sender<i64>,
     53     pub taler_out_channel: Sender<i64>,
     54 }
     55 
     56 impl MagnetApi {
     57     pub async fn start(pool: sqlx::PgPool, payto: PaytoURI) -> Self {
     58         let in_channel = Sender::new(0);
     59         let taler_in_channel = Sender::new(0);
     60         let out_channel = Sender::new(0);
     61         let taler_out_channel = Sender::new(0);
     62         let tmp = Self {
     63             pool: pool.clone(),
     64             payto,
     65             in_channel: in_channel.clone(),
     66             taler_in_channel: taler_in_channel.clone(),
     67             out_channel: out_channel.clone(),
     68             taler_out_channel: taler_out_channel.clone(),
     69         };
     70         tokio::spawn(db::notification_listener(
     71             pool,
     72             in_channel,
     73             taler_in_channel,
     74             out_channel,
     75             taler_out_channel,
     76         ));
     77         tmp
     78     }
     79 }
     80 
     81 impl TalerApi for MagnetApi {
     82     fn currency(&self) -> &str {
     83         CURRENCY.as_ref()
     84     }
     85 
     86     fn implementation(&self) -> &'static str {
     87         "urn:net:taler:specs:taler-magnet-bank:taler-rust"
     88     }
     89 }
     90 
     91 impl WireGateway for MagnetApi {
     92     async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
     93         let creditor = FullHuPayto::try_from(&req.credit_account)?;
     94         let result = db::make_transfer(
     95             &self.pool,
     96             &Transfer {
     97                 request_uid: req.request_uid,
     98                 wtid: req.wtid,
     99                 amount: req.amount.decimal(),
    100                 metadata: req.metadata,
    101                 creditor,
    102                 exchange_base_url: req.exchange_base_url,
    103             },
    104             &Timestamp::now(),
    105         )
    106         .await?;
    107         match result {
    108             db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse {
    109                 timestamp: initiated_at.into(),
    110                 row_id: SafeU64::try_from(id).unwrap(),
    111             }),
    112             db::TransferResult::RequestUidReuse => {
    113                 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED))
    114             }
    115             db::TransferResult::WtidReuse => {
    116                 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED))
    117             }
    118         }
    119     }
    120 
    121     async fn transfer_page(
    122         &self,
    123         page: Page,
    124         status: Option<TransferState>,
    125     ) -> ApiResult<TransferList> {
    126         Ok(TransferList {
    127             transfers: db::transfer_page(&self.pool, &status, &page).await?,
    128             debit_account: self.payto.clone(),
    129         })
    130     }
    131 
    132     async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
    133         Ok(db::transfer_by_id(&self.pool, id).await?)
    134     }
    135 
    136     async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
    137         Ok(OutgoingHistory {
    138             outgoing_transactions: db::outgoing_history(&self.pool, &params, || {
    139                 self.taler_out_channel.subscribe()
    140             })
    141             .await?,
    142             debit_account: self.payto.clone(),
    143         })
    144     }
    145 
    146     async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
    147         Ok(IncomingHistory {
    148             incoming_transactions: db::incoming_history(&self.pool, &params, || {
    149                 self.taler_in_channel.subscribe()
    150             })
    151             .await?,
    152             credit_account: self.payto.clone(),
    153         })
    154     }
    155 
    156     async fn add_incoming_reserve(
    157         &self,
    158         req: AddIncomingRequest,
    159     ) -> ApiResult<AddIncomingResponse> {
    160         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    161         let res = db::register_tx_in_admin(
    162             &self.pool,
    163             &TxInAdmin {
    164                 amount: req.amount,
    165                 subject: format!("Admin incoming {}", req.reserve_pub),
    166                 debtor,
    167                 metadata: IncomingSubject::Reserve(req.reserve_pub),
    168             },
    169             &Timestamp::now(),
    170         )
    171         .await?;
    172         match res {
    173             AddIncomingResult::Success {
    174                 row_id, valued_at, ..
    175             } => Ok(AddIncomingResponse {
    176                 row_id: safe_u64(row_id),
    177                 timestamp: date_to_utc_ts(&valued_at).into(),
    178             }),
    179             AddIncomingResult::ReservePubReuse => {
    180                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    181             }
    182             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    183                 unreachable!("mapping not used")
    184             }
    185         }
    186     }
    187 
    188     async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> {
    189         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    190         let res = db::register_tx_in_admin(
    191             &self.pool,
    192             &TxInAdmin {
    193                 amount: req.amount,
    194                 subject: format!("Admin incoming KYC:{}", req.account_pub),
    195                 debtor,
    196                 metadata: IncomingSubject::Kyc(req.account_pub),
    197             },
    198             &Timestamp::now(),
    199         )
    200         .await?;
    201         match res {
    202             AddIncomingResult::Success {
    203                 row_id, valued_at, ..
    204             } => Ok(AddKycauthResponse {
    205                 row_id: safe_u64(row_id),
    206                 timestamp: date_to_utc_ts(&valued_at).into(),
    207             }),
    208             AddIncomingResult::ReservePubReuse => {
    209                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    210             }
    211             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    212                 unreachable!("mapping not used")
    213             }
    214         }
    215     }
    216 
    217     fn support_account_check(&self) -> bool {
    218         false
    219     }
    220 }
    221 
    222 impl Revenue for MagnetApi {
    223     async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
    224         Ok(RevenueIncomingHistory {
    225             incoming_transactions: db::revenue_history(&self.pool, &params, || {
    226                 self.in_channel.subscribe()
    227             })
    228             .await?,
    229             credit_account: self.payto.clone(),
    230         })
    231     }
    232 }
    233 
    234 impl WireTransferGateway for MagnetApi {
    235     fn supported_formats(&self) -> &[SubjectFormat] {
    236         &[SubjectFormat::SIMPLE]
    237     }
    238 
    239     async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> {
    240         match db::transfer_register(&self.pool, &req).await? {
    241             db::RegistrationResult::Success => {
    242                 let simple = TransferSubject::Simple {
    243                     credit_amount: req.credit_amount,
    244                     subject: if req.authorization_pub == req.account_pub && !req.recurrent {
    245                         fmt_in_subject(req.r#type.into(), &req.account_pub)
    246                     } else {
    247                         fmt_in_subject(IncomingType::map, &req.authorization_pub)
    248                     },
    249                 };
    250                 ApiResult::Ok(RegistrationResponse {
    251                     subjects: vec![simple],
    252                     expiration: TalerTimestamp::Never,
    253                 })
    254             }
    255             db::RegistrationResult::ReservePubReuse => {
    256                 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    257             }
    258         }
    259     }
    260 
    261     async fn unregistration(&self, req: Unregistration) -> ApiResult<()> {
    262         if !db::transfer_unregister(&self.pool, &req).await? {
    263             Err(failure(
    264                 ErrorCode::BANK_TRANSACTION_NOT_FOUND,
    265                 format!("Prepared transfer '{}' not found", req.authorization_pub),
    266             ))
    267         } else {
    268             Ok(())
    269         }
    270     }
    271 }
    272 
    273 #[cfg(test)]
    274 mod test {
    275 
    276     use std::sync::{Arc, LazyLock};
    277 
    278     use crate::{
    279         FullHuPayto,
    280         api::MagnetApi,
    281         constants::CONFIG_SOURCE,
    282         db::{self, AddIncomingResult, TxIn, TxOutKind},
    283         magnet_api::types::TxStatus,
    284         magnet_payto,
    285     };
    286 
    287     use jiff::{Timestamp, Zoned};
    288     use sqlx::{PgPool, Row as _, postgres::PgRow};
    289     use taler_api::{
    290         api::TalerRouter as _,
    291         auth::AuthMethod,
    292         db::TypeHelper as _,
    293         subject::{IncomingSubject, OutgoingSubject},
    294     };
    295     use taler_common::{
    296         api_common::EddsaPublicKey,
    297         api_revenue::RevenueConfig,
    298         api_transfer::WireTransferConfig,
    299         api_wire::{OutgoingHistory, TransferState, WireConfig},
    300         db::IncomingType,
    301         types::{
    302             amount::amount,
    303             payto::{PaytoURI, payto},
    304         },
    305     };
    306     use taler_test_utils::{
    307         Router,
    308         db::db_test_setup,
    309         routine::{
    310             Status, admin_add_incoming_routine, registration_routine, revenue_routine,
    311             routine_pagination, transfer_routine,
    312         },
    313         server::TestServer,
    314     };
    315 
    316     static PAYTO: LazyLock<FullHuPayto> = LazyLock::new(|| {
    317         magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name")
    318     });
    319     static ACCOUNT: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_payto());
    320 
    321     async fn setup() -> (Router, PgPool) {
    322         let (_, pool) = db_test_setup(CONFIG_SOURCE).await;
    323         let api = Arc::new(MagnetApi::start(pool.clone(), ACCOUNT.clone()).await);
    324         let server = Router::new()
    325             .wire_gateway(api.clone(), AuthMethod::None)
    326             .wire_transfer_gateway(api.clone())
    327             .revenue(api, AuthMethod::None)
    328             .finalize();
    329 
    330         (server, pool)
    331     }
    332 
    333     #[tokio::test]
    334     async fn config() {
    335         let (server, _) = setup().await;
    336         server
    337             .get("/taler-wire-gateway/config")
    338             .await
    339             .assert_ok_json::<WireConfig>();
    340         server
    341             .get("/taler-wire-transfer-gateway/config")
    342             .await
    343             .assert_ok_json::<WireTransferConfig>();
    344         server
    345             .get("/taler-revenue/config")
    346             .await
    347             .assert_ok_json::<RevenueConfig>();
    348     }
    349 
    350     #[tokio::test]
    351     async fn transfer() {
    352         let (server, _) = setup().await;
    353         transfer_routine(
    354             &server,
    355             TransferState::pending,
    356             &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
    357         )
    358         .await;
    359     }
    360 
    361     #[tokio::test]
    362     async fn outgoing_history() {
    363         let (server, pool) = setup().await;
    364         routine_pagination::<OutgoingHistory, _>(
    365             &server,
    366             "/taler-wire-gateway/history/outgoing",
    367             async |i| {
    368                 let mut conn = pool.acquire().await.unwrap();
    369                 let now = Zoned::now().date();
    370                 db::register_tx_out(
    371                     &mut conn,
    372                     &db::TxOut {
    373                         code: i as u64,
    374                         amount: amount("EUR:10"),
    375                         subject: "subject".into(),
    376                         creditor: PAYTO.clone(),
    377                         value_date: now,
    378                         status: TxStatus::Completed,
    379                     },
    380                     &TxOutKind::Talerable(OutgoingSubject::rand()),
    381                     &Timestamp::now(),
    382                 )
    383                 .await
    384                 .unwrap();
    385             },
    386         )
    387         .await;
    388     }
    389 
    390     #[tokio::test]
    391     async fn admin_add_incoming() {
    392         let (server, _) = setup().await;
    393         admin_add_incoming_routine(&server, &ACCOUNT, true).await;
    394     }
    395 
    396     #[tokio::test]
    397     async fn revenue() {
    398         let (server, _) = setup().await;
    399         revenue_routine(&server, &ACCOUNT, true).await;
    400     }
    401 
    402     async fn check_in(pool: &PgPool) -> Vec<Status> {
    403         sqlx::query(
    404             "
    405             SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata 
    406             FROM tx_in
    407                 LEFT JOIN taler_in USING (tx_in_id)
    408                 LEFT JOIN pending_recurrent_in USING (tx_in_id)
    409                 LEFT JOIN bounced USING (tx_in_id)
    410             ORDER BY tx_in.tx_in_id
    411         ",
    412         )
    413         .try_map(|r: PgRow| {
    414             Ok(
    415                 if r.try_get_flag(0)? {
    416                     Status::Pending
    417                 } else if r.try_get_flag(1)? {
    418                     Status::Bounced
    419                 } else {
    420                     match r.try_get(2)? {
    421                         None => Status::Simple,
    422                         Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?),
    423                         Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?),
    424                         Some(e) => unreachable!("{e:?}")
    425                     }
    426                 }
    427             )
    428         })
    429         .fetch_all(pool)
    430         .await
    431         .unwrap()
    432     }
    433 
    434     pub async fn test_in(pool: &PgPool, key: EddsaPublicKey) {
    435         let tx = TxIn {
    436             code: rand::random_range(10..10000),
    437             amount: amount("EUR:12"),
    438             subject: Box::default(),
    439             debtor: PAYTO.clone(),
    440             value_date: Zoned::now().date(),
    441             status: TxStatus::Completed,
    442         };
    443         let mut db = pool.acquire().await.unwrap();
    444         let reason = match db::register_tx_in(
    445             &mut db,
    446             &tx,
    447             &Some(IncomingSubject::Map(key)),
    448             &Timestamp::now(),
    449         )
    450         .await
    451         .unwrap()
    452         {
    453             AddIncomingResult::Success { .. } => return,
    454             AddIncomingResult::ReservePubReuse => "reserve pub reuse",
    455             AddIncomingResult::UnknownMapping => "unknown mapping",
    456             AddIncomingResult::MappingReuse => "mapping reuse",
    457         };
    458         db::register_bounce_tx_in(&mut db, &tx, reason, &Timestamp::now())
    459             .await
    460             .unwrap();
    461     }
    462 
    463     #[tokio::test]
    464     async fn registration() {
    465         let (server, pool) = setup().await;
    466         registration_routine(
    467             &server,
    468             &ACCOUNT,
    469             || check_in(&pool),
    470             |account_pub| {
    471                 let account_pub = account_pub.clone();
    472                 let pool = &pool;
    473                 async move { test_in(pool, account_pub).await }
    474             },
    475         )
    476         .await;
    477     }
    478 }