api.rs (15794B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use taler_api::{ 19 api::{TalerApi, revenue::Revenue, transfer::WireTransferGateway, wire::WireGateway}, 20 error::{ApiResult, failure, failure_code}, 21 subject::{IncomingSubject, fmt_in_subject}, 22 }; 23 use taler_common::{ 24 api_common::{SafeU64, safe_u64}, 25 api_params::{History, Page}, 26 api_revenue::RevenueIncomingHistory, 27 api_transfer::{ 28 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration, 29 }, 30 api_wire::{ 31 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, 32 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 33 TransferState, TransferStatus, 34 }, 35 db::IncomingType, 36 error_code::ErrorCode, 37 types::{payto::PaytoURI, timestamp::TalerTimestamp, utils::date_to_utc_ts}, 38 }; 39 use tokio::sync::watch::Sender; 40 41 use crate::{ 42 FullHuPayto, 43 constants::CURRENCY, 44 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 45 }; 46 47 pub struct MagnetApi { 48 pub pool: sqlx::PgPool, 49 pub payto: PaytoURI, 50 pub in_channel: Sender<i64>, 51 pub taler_in_channel: Sender<i64>, 52 pub out_channel: Sender<i64>, 53 pub taler_out_channel: Sender<i64>, 54 } 55 56 impl MagnetApi { 57 pub async fn start(pool: sqlx::PgPool, payto: PaytoURI) -> Self { 58 let in_channel = Sender::new(0); 59 let taler_in_channel = Sender::new(0); 60 let out_channel = Sender::new(0); 61 let taler_out_channel = Sender::new(0); 62 let tmp = Self { 63 pool: pool.clone(), 64 payto, 65 in_channel: in_channel.clone(), 66 taler_in_channel: taler_in_channel.clone(), 67 out_channel: out_channel.clone(), 68 taler_out_channel: taler_out_channel.clone(), 69 }; 70 tokio::spawn(db::notification_listener( 71 pool, 72 in_channel, 73 taler_in_channel, 74 out_channel, 75 taler_out_channel, 76 )); 77 tmp 78 } 79 } 80 81 impl TalerApi for MagnetApi { 82 fn currency(&self) -> &str { 83 CURRENCY.as_ref() 84 } 85 86 fn implementation(&self) -> &'static str { 87 "urn:net:taler:specs:taler-magnet-bank:taler-rust" 88 } 89 } 90 91 impl WireGateway for MagnetApi { 92 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 93 let creditor = FullHuPayto::try_from(&req.credit_account)?; 94 let result = db::make_transfer( 95 &self.pool, 96 &Transfer { 97 request_uid: req.request_uid, 98 wtid: req.wtid, 99 amount: req.amount.decimal(), 100 metadata: req.metadata, 101 creditor, 102 exchange_base_url: req.exchange_base_url, 103 }, 104 &Timestamp::now(), 105 ) 106 .await?; 107 match result { 108 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 109 timestamp: initiated_at.into(), 110 row_id: SafeU64::try_from(id).unwrap(), 111 }), 112 db::TransferResult::RequestUidReuse => { 113 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 114 } 115 db::TransferResult::WtidReuse => { 116 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 117 } 118 } 119 } 120 121 async fn transfer_page( 122 &self, 123 page: Page, 124 status: Option<TransferState>, 125 ) -> ApiResult<TransferList> { 126 Ok(TransferList { 127 transfers: db::transfer_page(&self.pool, &status, &page).await?, 128 debit_account: self.payto.clone(), 129 }) 130 } 131 132 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 133 Ok(db::transfer_by_id(&self.pool, id).await?) 134 } 135 136 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 137 Ok(OutgoingHistory { 138 outgoing_transactions: db::outgoing_history(&self.pool, ¶ms, || { 139 self.taler_out_channel.subscribe() 140 }) 141 .await?, 142 debit_account: self.payto.clone(), 143 }) 144 } 145 146 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 147 Ok(IncomingHistory { 148 incoming_transactions: db::incoming_history(&self.pool, ¶ms, || { 149 self.taler_in_channel.subscribe() 150 }) 151 .await?, 152 credit_account: self.payto.clone(), 153 }) 154 } 155 156 async fn add_incoming_reserve( 157 &self, 158 req: AddIncomingRequest, 159 ) -> ApiResult<AddIncomingResponse> { 160 let debtor = FullHuPayto::try_from(&req.debit_account)?; 161 let res = db::register_tx_in_admin( 162 &self.pool, 163 &TxInAdmin { 164 amount: req.amount, 165 subject: format!("Admin incoming {}", req.reserve_pub), 166 debtor, 167 metadata: IncomingSubject::Reserve(req.reserve_pub), 168 }, 169 &Timestamp::now(), 170 ) 171 .await?; 172 match res { 173 AddIncomingResult::Success { 174 row_id, valued_at, .. 175 } => Ok(AddIncomingResponse { 176 row_id: safe_u64(row_id), 177 timestamp: date_to_utc_ts(&valued_at).into(), 178 }), 179 AddIncomingResult::ReservePubReuse => { 180 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 181 } 182 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 183 unreachable!("mapping not used") 184 } 185 } 186 } 187 188 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { 189 let debtor = FullHuPayto::try_from(&req.debit_account)?; 190 let res = db::register_tx_in_admin( 191 &self.pool, 192 &TxInAdmin { 193 amount: req.amount, 194 subject: format!("Admin incoming KYC:{}", req.account_pub), 195 debtor, 196 metadata: IncomingSubject::Kyc(req.account_pub), 197 }, 198 &Timestamp::now(), 199 ) 200 .await?; 201 match res { 202 AddIncomingResult::Success { 203 row_id, valued_at, .. 204 } => Ok(AddKycauthResponse { 205 row_id: safe_u64(row_id), 206 timestamp: date_to_utc_ts(&valued_at).into(), 207 }), 208 AddIncomingResult::ReservePubReuse => { 209 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 210 } 211 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 212 unreachable!("mapping not used") 213 } 214 } 215 } 216 217 fn support_account_check(&self) -> bool { 218 false 219 } 220 } 221 222 impl Revenue for MagnetApi { 223 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 224 Ok(RevenueIncomingHistory { 225 incoming_transactions: db::revenue_history(&self.pool, ¶ms, || { 226 self.in_channel.subscribe() 227 }) 228 .await?, 229 credit_account: self.payto.clone(), 230 }) 231 } 232 } 233 234 impl WireTransferGateway for MagnetApi { 235 fn supported_formats(&self) -> &[SubjectFormat] { 236 &[SubjectFormat::SIMPLE] 237 } 238 239 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 240 match db::transfer_register(&self.pool, &req).await? { 241 db::RegistrationResult::Success => { 242 let simple = TransferSubject::Simple { 243 credit_amount: req.credit_amount, 244 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 245 fmt_in_subject(req.r#type.into(), &req.account_pub) 246 } else { 247 fmt_in_subject(IncomingType::map, &req.authorization_pub) 248 }, 249 }; 250 ApiResult::Ok(RegistrationResponse { 251 subjects: vec![simple], 252 expiration: TalerTimestamp::Never, 253 }) 254 } 255 db::RegistrationResult::ReservePubReuse => { 256 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 257 } 258 } 259 } 260 261 async fn unregistration(&self, req: Unregistration) -> ApiResult<()> { 262 if !db::transfer_unregister(&self.pool, &req).await? { 263 Err(failure( 264 ErrorCode::BANK_TRANSACTION_NOT_FOUND, 265 format!("Prepared transfer '{}' not found", req.authorization_pub), 266 )) 267 } else { 268 Ok(()) 269 } 270 } 271 } 272 273 #[cfg(test)] 274 mod test { 275 276 use std::sync::{Arc, LazyLock}; 277 278 use crate::{ 279 FullHuPayto, 280 api::MagnetApi, 281 constants::CONFIG_SOURCE, 282 db::{self, AddIncomingResult, TxIn, TxOutKind}, 283 magnet_api::types::TxStatus, 284 magnet_payto, 285 }; 286 287 use jiff::{Timestamp, Zoned}; 288 use sqlx::{PgPool, Row as _, postgres::PgRow}; 289 use taler_api::{ 290 api::TalerRouter as _, 291 auth::AuthMethod, 292 db::TypeHelper as _, 293 subject::{IncomingSubject, OutgoingSubject}, 294 }; 295 use taler_common::{ 296 api_common::EddsaPublicKey, 297 api_revenue::RevenueConfig, 298 api_transfer::WireTransferConfig, 299 api_wire::{OutgoingHistory, TransferState, WireConfig}, 300 db::IncomingType, 301 types::{ 302 amount::amount, 303 payto::{PaytoURI, payto}, 304 }, 305 }; 306 use taler_test_utils::{ 307 Router, 308 db::db_test_setup, 309 routine::{ 310 Status, admin_add_incoming_routine, registration_routine, revenue_routine, 311 routine_pagination, transfer_routine, 312 }, 313 server::TestServer, 314 }; 315 316 static PAYTO: LazyLock<FullHuPayto> = LazyLock::new(|| { 317 magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name") 318 }); 319 static ACCOUNT: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_payto()); 320 321 async fn setup() -> (Router, PgPool) { 322 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 323 let api = Arc::new(MagnetApi::start(pool.clone(), ACCOUNT.clone()).await); 324 let server = Router::new() 325 .wire_gateway(api.clone(), AuthMethod::None) 326 .wire_transfer_gateway(api.clone()) 327 .revenue(api, AuthMethod::None) 328 .finalize(); 329 330 (server, pool) 331 } 332 333 #[tokio::test] 334 async fn config() { 335 let (server, _) = setup().await; 336 server 337 .get("/taler-wire-gateway/config") 338 .await 339 .assert_ok_json::<WireConfig>(); 340 server 341 .get("/taler-wire-transfer-gateway/config") 342 .await 343 .assert_ok_json::<WireTransferConfig>(); 344 server 345 .get("/taler-revenue/config") 346 .await 347 .assert_ok_json::<RevenueConfig>(); 348 } 349 350 #[tokio::test] 351 async fn transfer() { 352 let (server, _) = setup().await; 353 transfer_routine( 354 &server, 355 TransferState::pending, 356 &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 357 ) 358 .await; 359 } 360 361 #[tokio::test] 362 async fn outgoing_history() { 363 let (server, pool) = setup().await; 364 routine_pagination::<OutgoingHistory, _>( 365 &server, 366 "/taler-wire-gateway/history/outgoing", 367 async |i| { 368 let mut conn = pool.acquire().await.unwrap(); 369 let now = Zoned::now().date(); 370 db::register_tx_out( 371 &mut conn, 372 &db::TxOut { 373 code: i as u64, 374 amount: amount("EUR:10"), 375 subject: "subject".into(), 376 creditor: PAYTO.clone(), 377 value_date: now, 378 status: TxStatus::Completed, 379 }, 380 &TxOutKind::Talerable(OutgoingSubject::rand()), 381 &Timestamp::now(), 382 ) 383 .await 384 .unwrap(); 385 }, 386 ) 387 .await; 388 } 389 390 #[tokio::test] 391 async fn admin_add_incoming() { 392 let (server, _) = setup().await; 393 admin_add_incoming_routine(&server, &ACCOUNT, true).await; 394 } 395 396 #[tokio::test] 397 async fn revenue() { 398 let (server, _) = setup().await; 399 revenue_routine(&server, &ACCOUNT, true).await; 400 } 401 402 async fn check_in(pool: &PgPool) -> Vec<Status> { 403 sqlx::query( 404 " 405 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata 406 FROM tx_in 407 LEFT JOIN taler_in USING (tx_in_id) 408 LEFT JOIN pending_recurrent_in USING (tx_in_id) 409 LEFT JOIN bounced USING (tx_in_id) 410 ORDER BY tx_in.tx_in_id 411 ", 412 ) 413 .try_map(|r: PgRow| { 414 Ok( 415 if r.try_get_flag(0)? { 416 Status::Pending 417 } else if r.try_get_flag(1)? { 418 Status::Bounced 419 } else { 420 match r.try_get(2)? { 421 None => Status::Simple, 422 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 423 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 424 Some(e) => unreachable!("{e:?}") 425 } 426 } 427 ) 428 }) 429 .fetch_all(pool) 430 .await 431 .unwrap() 432 } 433 434 pub async fn test_in(pool: &PgPool, key: EddsaPublicKey) { 435 let tx = TxIn { 436 code: rand::random_range(10..10000), 437 amount: amount("EUR:12"), 438 subject: Box::default(), 439 debtor: PAYTO.clone(), 440 value_date: Zoned::now().date(), 441 status: TxStatus::Completed, 442 }; 443 let mut db = pool.acquire().await.unwrap(); 444 let reason = match db::register_tx_in( 445 &mut db, 446 &tx, 447 &Some(IncomingSubject::Map(key)), 448 &Timestamp::now(), 449 ) 450 .await 451 .unwrap() 452 { 453 AddIncomingResult::Success { .. } => return, 454 AddIncomingResult::ReservePubReuse => "reserve pub reuse", 455 AddIncomingResult::UnknownMapping => "unknown mapping", 456 AddIncomingResult::MappingReuse => "mapping reuse", 457 }; 458 db::register_bounce_tx_in(&mut db, &tx, reason, &Timestamp::now()) 459 .await 460 .unwrap(); 461 } 462 463 #[tokio::test] 464 async fn registration() { 465 let (server, pool) = setup().await; 466 registration_routine( 467 &server, 468 &ACCOUNT, 469 || check_in(&pool), 470 |account_pub| { 471 let account_pub = account_pub.clone(); 472 let pool = &pool; 473 async move { test_in(pool, account_pub).await } 474 }, 475 ) 476 .await; 477 } 478 }