taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

api.rs (16766B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use compact_str::CompactString;
     18 use jiff::Timestamp;
     19 use taler_api::{
     20     api::{TalerApi, revenue::Revenue, transfer::WireTransferGateway, wire::WireGateway},
     21     error::{ApiResult, failure, failure_code},
     22     subject::{IncomingSubject, fmt_in_subject},
     23 };
     24 use taler_common::{
     25     api_common::{SafeU64, safe_u64},
     26     api_params::{History, Page},
     27     api_revenue::RevenueIncomingHistory,
     28     api_transfer::{
     29         RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration,
     30     },
     31     api_wire::{
     32         AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse,
     33         IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
     34         TransferState, TransferStatus,
     35     },
     36     db::IncomingType,
     37     error_code::ErrorCode,
     38     types::{amount::Currency, payto::PaytoURI, timestamp::TalerTimestamp},
     39 };
     40 use tokio::sync::watch::Sender;
     41 
     42 use crate::{
     43     db::{self, AddIncomingResult, Transfer, TxInAdmin},
     44     payto::FullCyclosPayto,
     45 };
     46 
     47 pub struct CyclosApi {
     48     pub pool: sqlx::PgPool,
     49     pub currency: Currency,
     50     pub payto: PaytoURI,
     51     pub in_channel: Sender<i64>,
     52     pub taler_in_channel: Sender<i64>,
     53     pub out_channel: Sender<i64>,
     54     pub taler_out_channel: Sender<i64>,
     55     pub root: CompactString,
     56 }
     57 
     58 impl CyclosApi {
     59     pub async fn start(
     60         pool: sqlx::PgPool,
     61         root: CompactString,
     62         payto: PaytoURI,
     63         currency: Currency,
     64     ) -> Self {
     65         let in_channel = Sender::new(0);
     66         let taler_in_channel = Sender::new(0);
     67         let out_channel = Sender::new(0);
     68         let taler_out_channel = Sender::new(0);
     69         let tmp = Self {
     70             pool: pool.clone(),
     71             payto,
     72             currency,
     73             root,
     74             in_channel: in_channel.clone(),
     75             taler_in_channel: taler_in_channel.clone(),
     76             out_channel: out_channel.clone(),
     77             taler_out_channel: taler_out_channel.clone(),
     78         };
     79         tokio::spawn(db::notification_listener(
     80             pool,
     81             in_channel,
     82             taler_in_channel,
     83             out_channel,
     84             taler_out_channel,
     85         ));
     86         tmp
     87     }
     88 }
     89 
     90 impl TalerApi for CyclosApi {
     91     fn currency(&self) -> &str {
     92         self.currency.as_ref()
     93     }
     94 
     95     fn implementation(&self) -> &'static str {
     96         "urn:net:taler:specs:taler-cyclos:taler-rust"
     97     }
     98 }
     99 
    100 impl WireGateway for CyclosApi {
    101     async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
    102         let creditor = FullCyclosPayto::try_from(&req.credit_account)?;
    103         let result = db::make_transfer(
    104             &self.pool,
    105             &Transfer {
    106                 request_uid: req.request_uid,
    107                 amount: req.amount.decimal(),
    108                 exchange_base_url: req.exchange_base_url,
    109                 metadata: req.metadata,
    110                 wtid: req.wtid,
    111                 creditor_id: *creditor.id,
    112                 creditor_name: creditor.name,
    113             },
    114             &Timestamp::now(),
    115         )
    116         .await?;
    117         match result {
    118             db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse {
    119                 timestamp: initiated_at.into(),
    120                 row_id: SafeU64::try_from(id).unwrap(),
    121             }),
    122             db::TransferResult::RequestUidReuse => {
    123                 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED))
    124             }
    125             db::TransferResult::WtidReuse => {
    126                 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED))
    127             }
    128         }
    129     }
    130 
    131     async fn transfer_page(
    132         &self,
    133         page: Page,
    134         status: Option<TransferState>,
    135     ) -> ApiResult<TransferList> {
    136         Ok(TransferList {
    137             transfers: db::transfer_page(&self.pool, &status, &self.currency, &self.root, &page)
    138                 .await?,
    139             debit_account: self.payto.clone(),
    140         })
    141     }
    142 
    143     async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
    144         Ok(db::transfer_by_id(&self.pool, id, &self.currency, &self.root).await?)
    145     }
    146 
    147     async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
    148         Ok(OutgoingHistory {
    149             outgoing_transactions: db::outgoing_history(
    150                 &self.pool,
    151                 &params,
    152                 &self.currency,
    153                 &self.root,
    154                 || self.taler_out_channel.subscribe(),
    155             )
    156             .await?,
    157             debit_account: self.payto.clone(),
    158         })
    159     }
    160 
    161     async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
    162         Ok(IncomingHistory {
    163             incoming_transactions: db::incoming_history(
    164                 &self.pool,
    165                 &params,
    166                 &self.currency,
    167                 &self.root,
    168                 || self.taler_in_channel.subscribe(),
    169             )
    170             .await?,
    171             credit_account: self.payto.clone(),
    172         })
    173     }
    174 
    175     async fn add_incoming_reserve(
    176         &self,
    177         req: AddIncomingRequest,
    178     ) -> ApiResult<AddIncomingResponse> {
    179         let debtor = FullCyclosPayto::try_from(&req.debit_account)?;
    180         let res = db::register_tx_in_admin(
    181             &self.pool,
    182             &TxInAdmin {
    183                 amount: req.amount.decimal(),
    184                 subject: format!("Admin incoming {}", req.reserve_pub),
    185                 debtor_id: *debtor.id,
    186                 debtor_name: debtor.name,
    187                 metadata: IncomingSubject::Reserve(req.reserve_pub),
    188             },
    189             &Timestamp::now(),
    190         )
    191         .await?;
    192         match res {
    193             AddIncomingResult::Success {
    194                 row_id, valued_at, ..
    195             } => Ok(AddIncomingResponse {
    196                 row_id: safe_u64(row_id),
    197                 timestamp: valued_at.into(),
    198             }),
    199             AddIncomingResult::ReservePubReuse => {
    200                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    201             }
    202             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    203                 unreachable!("mapping unused")
    204             }
    205         }
    206     }
    207 
    208     async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> {
    209         let debtor = FullCyclosPayto::try_from(&req.debit_account)?;
    210         let res = db::register_tx_in_admin(
    211             &self.pool,
    212             &TxInAdmin {
    213                 amount: req.amount.decimal(),
    214                 subject: format!("Admin incoming KYC:{}", req.account_pub),
    215                 debtor_id: *debtor.id,
    216                 debtor_name: debtor.name,
    217                 metadata: IncomingSubject::Kyc(req.account_pub),
    218             },
    219             &Timestamp::now(),
    220         )
    221         .await?;
    222         match res {
    223             AddIncomingResult::Success {
    224                 row_id, valued_at, ..
    225             } => Ok(AddKycauthResponse {
    226                 row_id: safe_u64(row_id),
    227                 timestamp: valued_at.into(),
    228             }),
    229             AddIncomingResult::ReservePubReuse => {
    230                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    231             }
    232             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    233                 unreachable!("mapping unused")
    234             }
    235         }
    236     }
    237 
    238     fn support_account_check(&self) -> bool {
    239         false
    240     }
    241 }
    242 
    243 impl Revenue for CyclosApi {
    244     async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
    245         Ok(RevenueIncomingHistory {
    246             incoming_transactions: db::revenue_history(
    247                 &self.pool,
    248                 &params,
    249                 &self.currency,
    250                 &self.root,
    251                 || self.in_channel.subscribe(),
    252             )
    253             .await?,
    254             credit_account: self.payto.clone(),
    255         })
    256     }
    257 }
    258 
    259 impl WireTransferGateway for CyclosApi {
    260     fn supported_formats(&self) -> &[SubjectFormat] {
    261         &[SubjectFormat::SIMPLE]
    262     }
    263 
    264     async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> {
    265         match db::transfer_register(&self.pool, &req).await? {
    266             db::RegistrationResult::Success => {
    267                 let simple = TransferSubject::Simple {
    268                     credit_amount: req.credit_amount,
    269                     subject: if req.authorization_pub == req.account_pub && !req.recurrent {
    270                         fmt_in_subject(req.r#type.into(), &req.account_pub)
    271                     } else {
    272                         fmt_in_subject(IncomingType::map, &req.authorization_pub)
    273                     },
    274                 };
    275                 ApiResult::Ok(RegistrationResponse {
    276                     subjects: vec![simple],
    277                     expiration: TalerTimestamp::Never,
    278                 })
    279             }
    280             db::RegistrationResult::ReservePubReuse => {
    281                 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    282             }
    283         }
    284     }
    285 
    286     async fn unregistration(&self, req: Unregistration) -> ApiResult<()> {
    287         if !db::transfer_unregister(&self.pool, &req).await? {
    288             Err(failure(
    289                 ErrorCode::BANK_TRANSACTION_NOT_FOUND,
    290                 format!("Prepared transfer '{}' not found", req.authorization_pub),
    291             ))
    292         } else {
    293             Ok(())
    294         }
    295     }
    296 }
    297 
    298 #[cfg(test)]
    299 mod test {
    300     use std::{
    301         str::FromStr as _,
    302         sync::{Arc, LazyLock},
    303     };
    304 
    305     use compact_str::CompactString;
    306     use jiff::Timestamp;
    307     use sqlx::{PgPool, Row as _, postgres::PgRow};
    308     use taler_api::{
    309         api::TalerRouter as _,
    310         auth::AuthMethod,
    311         db::TypeHelper as _,
    312         subject::{IncomingSubject, OutgoingSubject},
    313     };
    314     use taler_common::{
    315         api_common::EddsaPublicKey,
    316         api_revenue::RevenueConfig,
    317         api_transfer::WireTransferConfig,
    318         api_wire::{OutgoingHistory, TransferState, WireConfig},
    319         db::IncomingType,
    320         types::{
    321             amount::{Currency, decimal},
    322             payto::{PaytoURI, payto},
    323         },
    324     };
    325     use taler_test_utils::{
    326         Router,
    327         db::db_test_setup,
    328         routine::{
    329             Status, admin_add_incoming_routine, registration_routine, revenue_routine,
    330             routine_pagination, transfer_routine,
    331         },
    332         server::TestServer as _,
    333     };
    334 
    335     use crate::{
    336         api::CyclosApi,
    337         constants::CONFIG_SOURCE,
    338         db::{self, AddIncomingResult, TxIn, TxOutKind},
    339     };
    340 
    341     static ACCOUNT: LazyLock<PaytoURI> =
    342         LazyLock::new(|| payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name"));
    343 
    344     async fn setup() -> (Router, PgPool) {
    345         let (_, pool) = db_test_setup(CONFIG_SOURCE).await;
    346         let api = Arc::new(
    347             CyclosApi::start(
    348                 pool.clone(),
    349                 CompactString::const_new("localhost"),
    350                 ACCOUNT.clone(),
    351                 Currency::from_str("TEST").unwrap(),
    352             )
    353             .await,
    354         );
    355         let server = Router::new()
    356             .wire_gateway(api.clone(), AuthMethod::None)
    357             .wire_transfer_gateway(api.clone())
    358             .revenue(api, AuthMethod::None)
    359             .finalize();
    360 
    361         (server, pool)
    362     }
    363 
    364     #[tokio::test]
    365     async fn config() {
    366         let (server, _) = setup().await;
    367         server
    368             .get("/taler-wire-gateway/config")
    369             .await
    370             .assert_ok_json::<WireConfig>();
    371         server
    372             .get("/taler-wire-transfer-gateway/config")
    373             .await
    374             .assert_ok_json::<WireTransferConfig>();
    375         server
    376             .get("/taler-revenue/config")
    377             .await
    378             .assert_ok_json::<RevenueConfig>();
    379     }
    380 
    381     #[tokio::test]
    382     async fn transfer() {
    383         let (server, _) = setup().await;
    384         transfer_routine(&server, TransferState::pending, &ACCOUNT).await;
    385     }
    386 
    387     #[tokio::test]
    388     async fn outgoing_history() {
    389         let (server, pool) = setup().await;
    390         routine_pagination::<OutgoingHistory, _>(
    391             &server,
    392             "/taler-wire-gateway/history/outgoing",
    393             async |i| {
    394                 db::register_tx_out(
    395                     &mut pool.acquire().await.unwrap(),
    396                     &db::TxOut {
    397                         transfer_id: i as i64,
    398                         tx_id: if i % 2 == 0 {
    399                             Some((i % 2) as i64)
    400                         } else {
    401                             None
    402                         },
    403                         amount: decimal("10"),
    404                         subject: "subject".to_owned(),
    405                         creditor_id: 31000163100000000,
    406                         creditor_name: "Name".into(),
    407                         valued_at: Timestamp::now(),
    408                     },
    409                     &TxOutKind::Talerable(OutgoingSubject::rand()),
    410                     &Timestamp::now(),
    411                 )
    412                 .await
    413                 .unwrap();
    414             },
    415         )
    416         .await;
    417     }
    418 
    419     #[tokio::test]
    420     async fn admin_add_incoming() {
    421         let (server, _) = setup().await;
    422         admin_add_incoming_routine(&server, &ACCOUNT, true).await;
    423     }
    424 
    425     #[tokio::test]
    426     async fn revenue() {
    427         let (server, _) = setup().await;
    428         revenue_routine(&server, &ACCOUNT, true).await;
    429     }
    430 
    431     async fn check_in(pool: &PgPool) -> Vec<Status> {
    432         sqlx::query(
    433             "
    434             SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata 
    435             FROM tx_in
    436                 LEFT JOIN taler_in USING (tx_in_id)
    437                 LEFT JOIN pending_recurrent_in USING (tx_in_id)
    438                 LEFT JOIN bounced USING (tx_in_id)
    439             ORDER BY tx_in.tx_in_id
    440         ",
    441         )
    442         .try_map(|r: PgRow| {
    443             Ok(
    444                 if r.try_get_flag(0)? {
    445                     Status::Pending
    446                 } else if r.try_get_flag(1)? {
    447                     Status::Bounced
    448                 } else {
    449                     match r.try_get(2)? {
    450                         None => Status::Simple,
    451                         Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?),
    452                         Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?),
    453                         Some(e) => unreachable!("{e:?}")
    454                     }
    455                 }
    456             )
    457         })
    458         .fetch_all(pool)
    459         .await
    460         .unwrap()
    461     }
    462 
    463     pub async fn test_in(pool: &PgPool, key: EddsaPublicKey) {
    464         let tx = TxIn {
    465             transfer_id: rand::random_range(10..10000),
    466             tx_id: None,
    467             amount: decimal("12"),
    468             subject: String::new(),
    469             debtor_id: rand::random_range(10..10000),
    470             debtor_name: "Name".into(),
    471             valued_at: Timestamp::now(),
    472         };
    473         let mut db = pool.acquire().await.unwrap();
    474         let reason = match db::register_tx_in(
    475             &mut db,
    476             &tx,
    477             &Some(IncomingSubject::Map(key)),
    478             &Timestamp::now(),
    479         )
    480         .await
    481         .unwrap()
    482         {
    483             AddIncomingResult::Success { .. } => return,
    484             AddIncomingResult::ReservePubReuse => "reserve pub reuse",
    485             AddIncomingResult::UnknownMapping => "unknown mapping",
    486             AddIncomingResult::MappingReuse => "mapping reuse",
    487         };
    488         db::register_bounced_tx_in(
    489             &mut db,
    490             &tx,
    491             rand::random_range(10..10000),
    492             reason,
    493             &Timestamp::now(),
    494         )
    495         .await
    496         .unwrap();
    497     }
    498 
    499     #[tokio::test]
    500     async fn registration() {
    501         let (server, pool) = setup().await;
    502         registration_routine(
    503             &server,
    504             &ACCOUNT,
    505             || check_in(&pool),
    506             |account_pub| {
    507                 let account_pub = account_pub.clone();
    508                 let pool = &pool;
    509                 async move { test_in(pool, account_pub).await }
    510             },
    511         )
    512         .await;
    513     }
    514 }