taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

api.rs (17482B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use taler_api::{
     19     api::{TalerApi, prepared::PreparedTransfer, revenue::Revenue, wire::WireGateway},
     20     error::{ApiResult, failure_code},
     21     subject::{IncomingSubject, fmt_in_subject},
     22 };
     23 use taler_common::{
     24     api::{
     25         params::{History, Page},
     26         prepared::{
     27             RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject,
     28             Unregistration,
     29         },
     30         revenue::RevenueIncomingHistory,
     31         wire::{
     32             AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest,
     33             IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
     34             TransferState, TransferStatus,
     35         },
     36     },
     37     db::IncomingType,
     38     error_code::ErrorCode,
     39     types::{amount::Currency, payto::PaytoURI, timestamp::TalerTimestamp, utils::date_to_utc_ts},
     40 };
     41 use tokio::sync::watch::Sender;
     42 
     43 use crate::{
     44     FullHuPayto,
     45     constants::CURR,
     46     db::{self, AddIncomingResult, Transfer, TxInAdmin},
     47 };
     48 
     49 pub struct MagnetApi {
     50     pub pool: sqlx::PgPool,
     51     pub payto: PaytoURI,
     52     pub in_channel: Sender<i64>,
     53     pub taler_in_channel: Sender<i64>,
     54     pub out_channel: Sender<i64>,
     55     pub taler_out_channel: Sender<i64>,
     56 }
     57 
     58 impl MagnetApi {
     59     pub async fn start(pool: sqlx::PgPool, payto: PaytoURI) -> Self {
     60         let in_channel = Sender::new(0);
     61         let taler_in_channel = Sender::new(0);
     62         let out_channel = Sender::new(0);
     63         let taler_out_channel = Sender::new(0);
     64         let tmp = Self {
     65             pool: pool.clone(),
     66             payto,
     67             in_channel: in_channel.clone(),
     68             taler_in_channel: taler_in_channel.clone(),
     69             out_channel: out_channel.clone(),
     70             taler_out_channel: taler_out_channel.clone(),
     71         };
     72         tokio::spawn(db::notification_listener(
     73             pool,
     74             in_channel,
     75             taler_in_channel,
     76             out_channel,
     77             taler_out_channel,
     78         ));
     79         tmp
     80     }
     81 }
     82 
     83 impl TalerApi for MagnetApi {
     84     fn currency(&self) -> Currency {
     85         CURR
     86     }
     87 
     88     fn implementation(&self) -> &'static str {
     89         "urn:net:taler:specs:taler-magnet-bank:taler-rust"
     90     }
     91 }
     92 
     93 impl WireGateway for MagnetApi {
     94     async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
     95         let creditor = FullHuPayto::try_from(&req.credit_account)?;
     96         let result = db::make_transfer(
     97             &self.pool,
     98             &Transfer {
     99                 request_uid: req.request_uid,
    100                 wtid: req.wtid,
    101                 amount: req.amount.decimal(),
    102                 metadata: req.metadata,
    103                 creditor,
    104                 exchange_base_url: req.exchange_base_url,
    105             },
    106             &Timestamp::now(),
    107         )
    108         .await?;
    109         match result {
    110             db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse {
    111                 timestamp: initiated_at.into(),
    112                 row_id: id,
    113             }),
    114             db::TransferResult::RequestUidReuse => {
    115                 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED))
    116             }
    117             db::TransferResult::WtidReuse => {
    118                 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED))
    119             }
    120         }
    121     }
    122 
    123     async fn transfer_page(
    124         &self,
    125         page: Page,
    126         status: Option<TransferState>,
    127     ) -> ApiResult<TransferList> {
    128         Ok(TransferList {
    129             transfers: db::transfer_page(&self.pool, &status, &page).await?,
    130             debit_account: self.payto.clone(),
    131         })
    132     }
    133 
    134     async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
    135         Ok(db::transfer_by_id(&self.pool, id).await?)
    136     }
    137 
    138     async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
    139         Ok(OutgoingHistory {
    140             outgoing_transactions: db::outgoing_history(&self.pool, &params, || {
    141                 self.taler_out_channel.subscribe()
    142             })
    143             .await?,
    144             debit_account: self.payto.clone(),
    145         })
    146     }
    147 
    148     async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
    149         Ok(IncomingHistory {
    150             incoming_transactions: db::incoming_history(&self.pool, &params, || {
    151                 self.taler_in_channel.subscribe()
    152             })
    153             .await?,
    154             credit_account: self.payto.clone(),
    155         })
    156     }
    157 
    158     async fn add_incoming_reserve(
    159         &self,
    160         req: AddIncomingRequest,
    161     ) -> ApiResult<AddIncomingResponse> {
    162         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    163         let res = db::register_tx_in_admin(
    164             &self.pool,
    165             &TxInAdmin {
    166                 amount: req.amount,
    167                 subject: format!("Admin incoming {}", req.reserve_pub),
    168                 debtor,
    169                 metadata: IncomingSubject::Reserve(req.reserve_pub),
    170             },
    171             &Timestamp::now(),
    172         )
    173         .await?;
    174         match res {
    175             AddIncomingResult::Success {
    176                 row_id, valued_at, ..
    177             } => Ok(AddIncomingResponse {
    178                 row_id,
    179                 timestamp: date_to_utc_ts(&valued_at).into(),
    180             }),
    181             AddIncomingResult::ReservePubReuse => {
    182                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    183             }
    184             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    185                 unreachable!("mapping not used")
    186             }
    187         }
    188     }
    189 
    190     async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> {
    191         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    192         let res = db::register_tx_in_admin(
    193             &self.pool,
    194             &TxInAdmin {
    195                 amount: req.amount,
    196                 subject: format!("Admin incoming KYC:{}", req.account_pub),
    197                 debtor,
    198                 metadata: IncomingSubject::Kyc(req.account_pub),
    199             },
    200             &Timestamp::now(),
    201         )
    202         .await?;
    203         match res {
    204             AddIncomingResult::Success {
    205                 row_id, valued_at, ..
    206             } => Ok(AddIncomingResponse {
    207                 row_id,
    208                 timestamp: date_to_utc_ts(&valued_at).into(),
    209             }),
    210             AddIncomingResult::ReservePubReuse => unreachable!("kyc"),
    211             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    212                 unreachable!("mapping not used")
    213             }
    214         }
    215     }
    216 
    217     async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> {
    218         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    219         let res = db::register_tx_in_admin(
    220             &self.pool,
    221             &TxInAdmin {
    222                 amount: req.amount,
    223                 subject: format!("Admin incoming MAP:{}", req.authorization_pub),
    224                 debtor,
    225                 metadata: IncomingSubject::Map(req.authorization_pub),
    226             },
    227             &Timestamp::now(),
    228         )
    229         .await?;
    230         match res {
    231             AddIncomingResult::Success {
    232                 row_id, valued_at, ..
    233             } => Ok(AddIncomingResponse {
    234                 row_id,
    235                 timestamp: date_to_utc_ts(&valued_at).into(),
    236             }),
    237             AddIncomingResult::ReservePubReuse => {
    238                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    239             }
    240             AddIncomingResult::UnknownMapping => {
    241                 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN))
    242             }
    243             AddIncomingResult::MappingReuse => {
    244                 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED))
    245             }
    246         }
    247     }
    248 
    249     fn support_account_check(&self) -> bool {
    250         false
    251     }
    252 }
    253 
    254 impl Revenue for MagnetApi {
    255     async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
    256         Ok(RevenueIncomingHistory {
    257             incoming_transactions: db::revenue_history(&self.pool, &params, || {
    258                 self.in_channel.subscribe()
    259             })
    260             .await?,
    261             credit_account: self.payto.clone(),
    262         })
    263     }
    264 }
    265 
    266 impl PreparedTransfer for MagnetApi {
    267     fn supported_formats(&self) -> &[SubjectFormat] {
    268         &[SubjectFormat::SIMPLE]
    269     }
    270 
    271     async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> {
    272         match db::transfer_register(&self.pool, &req).await? {
    273             db::RegistrationResult::Success => {
    274                 let simple = TransferSubject::Simple {
    275                     credit_amount: req.credit_amount,
    276                     subject: if req.authorization_pub == req.account_pub && !req.recurrent {
    277                         fmt_in_subject(req.r#type.into(), &req.account_pub).to_string()
    278                     } else {
    279                         fmt_in_subject(IncomingType::map, &req.authorization_pub).to_string()
    280                     },
    281                 };
    282                 ApiResult::Ok(RegistrationResponse {
    283                     subjects: vec![simple],
    284                     expiration: TalerTimestamp::Never,
    285                 })
    286             }
    287             db::RegistrationResult::ReservePubReuse => {
    288                 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    289             }
    290         }
    291     }
    292 
    293     async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> {
    294         Ok(db::transfer_unregister(&self.pool, &req).await?)
    295     }
    296 }
    297 
    298 #[cfg(test)]
    299 mod test {
    300 
    301     use std::sync::{
    302         Arc, LazyLock,
    303         atomic::{AtomicU64, Ordering},
    304     };
    305 
    306     use jiff::{Timestamp, Zoned};
    307     use sqlx::{PgPool, Row as _, postgres::PgRow};
    308     use taler_api::{
    309         api::TalerRouter as _,
    310         auth::AuthMethod,
    311         db::TypeHelper as _,
    312         subject::{IncomingSubject, OutgoingSubject},
    313     };
    314     use taler_common::{
    315         api::{
    316             EddsaPublicKey,
    317             prepared::PreparedTransferConfig,
    318             revenue::RevenueConfig,
    319             wire::{TransferState, WireConfig},
    320         },
    321         db::IncomingType,
    322         types::{
    323             amount::amount,
    324             payto::{PaytoURI, payto},
    325         },
    326     };
    327     use taler_test_utils::{
    328         Router,
    329         db::db_test_setup,
    330         routine::{
    331             Status, admin_add_incoming_routine, in_history_routine, out_history_routine,
    332             registration_routine, revenue_routine, transfer_routine,
    333         },
    334         server::TestServer,
    335         tasks,
    336     };
    337 
    338     use crate::{
    339         FullHuPayto,
    340         api::MagnetApi,
    341         constants::CONFIG_SOURCE,
    342         db::{self, TxIn, TxOutKind},
    343         magnet_api::types::TxStatus,
    344         magnet_payto,
    345     };
    346 
    347     static PAYTO: LazyLock<FullHuPayto> = LazyLock::new(|| {
    348         magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name")
    349     });
    350     static ACCOUNT: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_uri());
    351 
    352     async fn setup() -> (Router, PgPool) {
    353         let (_, pool) = db_test_setup(CONFIG_SOURCE).await;
    354         let api = Arc::new(MagnetApi::start(pool.clone(), ACCOUNT.clone()).await);
    355         let server = Router::new()
    356             .wire_gateway(api.clone(), AuthMethod::None)
    357             .prepared_transfer(api.clone())
    358             .revenue(api, AuthMethod::None)
    359             .finalize();
    360 
    361         (server, pool)
    362     }
    363 
    364     #[tokio::test]
    365     async fn config() {
    366         let (server, _) = setup().await;
    367         server
    368             .get("/taler-wire-gateway/config")
    369             .await
    370             .assert_ok_json::<WireConfig>();
    371         server
    372             .get("/taler-prepared-transfer/config")
    373             .await
    374             .assert_ok_json::<PreparedTransferConfig>();
    375         server
    376             .get("/taler-revenue/config")
    377             .await
    378             .assert_ok_json::<RevenueConfig>();
    379     }
    380 
    381     #[tokio::test]
    382     async fn transfer() {
    383         let (server, _) = setup().await;
    384         transfer_routine(
    385             &server,
    386             TransferState::pending,
    387             &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
    388         )
    389         .await;
    390     }
    391 
    392     static CODE: AtomicU64 = AtomicU64::new(0);
    393 
    394     async fn r#in(db: &PgPool, subject: Option<IncomingSubject>) {
    395         db::register_tx_in(
    396             &mut db.acquire().await.unwrap(),
    397             &TxIn {
    398                 code: CODE.fetch_add(1, Ordering::Relaxed),
    399                 amount: amount("EUR:10"),
    400                 subject: "subject".into(),
    401                 debtor: magnet_payto(
    402                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
    403                 ),
    404                 value_date: Zoned::now().date(),
    405                 status: TxStatus::Completed,
    406             },
    407             &subject,
    408             &Timestamp::now(),
    409         )
    410         .await
    411         .unwrap();
    412     }
    413 
    414     async fn in_malformed(db: &PgPool) {
    415         r#in(db, None).await
    416     }
    417 
    418     async fn in_talerable(db: &PgPool) {
    419         r#in(db, Some(IncomingSubject::Reserve(EddsaPublicKey::rand()))).await
    420     }
    421 
    422     async fn out(db: &PgPool, kind: &TxOutKind) {
    423         db::register_tx_out(
    424             &mut db.acquire().await.unwrap(),
    425             &db::TxOut {
    426                 code: CODE.fetch_add(1, Ordering::Relaxed),
    427                 amount: amount("EUR:10"),
    428                 subject: "subject".into(),
    429                 creditor: PAYTO.clone(),
    430                 value_date: Zoned::now().date(),
    431                 status: TxStatus::Completed,
    432             },
    433             kind,
    434             &Timestamp::now(),
    435         )
    436         .await
    437         .unwrap();
    438     }
    439 
    440     async fn out_talerable(db: &PgPool) {
    441         out(db, &TxOutKind::Talerable(OutgoingSubject::rand())).await
    442     }
    443 
    444     async fn out_bounce(db: &PgPool) {
    445         out(db, &TxOutKind::Bounce(CODE.load(Ordering::Relaxed) as u32)).await
    446     }
    447 
    448     async fn out_malformed(db: &PgPool) {
    449         out(db, &TxOutKind::Simple).await
    450     }
    451 
    452     #[tokio::test]
    453     async fn outgoing_history() {
    454         let (server, db) = &setup().await;
    455 
    456         out_history_routine(
    457             server,
    458             tasks!({ out_talerable(db).await }),
    459             tasks!(
    460                 { out_bounce(db).await },
    461                 { out_malformed(db).await },
    462                 { in_malformed(db).await },
    463                 { in_talerable(db).await }
    464             ),
    465         )
    466         .await;
    467     }
    468 
    469     #[tokio::test]
    470     async fn admin_add_incoming() {
    471         let (server, _) = setup().await;
    472         admin_add_incoming_routine(&server, &ACCOUNT, true).await;
    473     }
    474 
    475     #[tokio::test]
    476     async fn in_history() {
    477         let (server, db) = &setup().await;
    478         in_history_routine(
    479             server,
    480             &ACCOUNT,
    481             true,
    482             tasks!({ in_talerable(db).await }),
    483             tasks!(
    484                 { out_malformed(db).await },
    485                 { out_talerable(db).await },
    486                 { out_bounce(db).await },
    487                 { in_malformed(db).await }
    488             ),
    489         )
    490         .await;
    491     }
    492 
    493     #[tokio::test]
    494     async fn revenue() {
    495         let (server, db) = &setup().await;
    496         revenue_routine(
    497             server,
    498             &ACCOUNT,
    499             true,
    500             tasks!({ in_malformed(db).await }, { in_talerable(db).await },),
    501             tasks!({ out_malformed(db).await }, { out_talerable(db).await }, {
    502                 out_bounce(db).await
    503             }),
    504         )
    505         .await;
    506     }
    507 
    508     async fn check_in(pool: &PgPool) -> Vec<Status> {
    509         sqlx::query(
    510             "
    511             SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata
    512             FROM tx_in
    513                 LEFT JOIN taler_in USING (tx_in_id)
    514                 LEFT JOIN pending_recurrent_in USING (tx_in_id)
    515                 LEFT JOIN bounced USING (tx_in_id)
    516             ORDER BY tx_in.tx_in_id
    517         ",
    518         )
    519         .try_map(|r: PgRow| {
    520             Ok(
    521                 if r.try_get_flag(0)? {
    522                     Status::Pending
    523                 } else if r.try_get_flag(1)? {
    524                     Status::Bounced
    525                 } else {
    526                     match r.try_get(2)? {
    527                         None => Status::Simple,
    528                         Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?),
    529                         Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?),
    530                         Some(e) => unreachable!("{e:?}")
    531                     }
    532                 }
    533             )
    534         })
    535         .fetch_all(pool)
    536         .await
    537         .unwrap()
    538     }
    539 
    540     #[tokio::test]
    541     async fn registration() {
    542         let (server, pool) = setup().await;
    543         registration_routine(&server, &ACCOUNT, || check_in(&pool)).await;
    544     }
    545 }