api.rs (17482B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use taler_api::{ 19 api::{TalerApi, prepared::PreparedTransfer, revenue::Revenue, wire::WireGateway}, 20 error::{ApiResult, failure_code}, 21 subject::{IncomingSubject, fmt_in_subject}, 22 }; 23 use taler_common::{ 24 api::{ 25 params::{History, Page}, 26 prepared::{ 27 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, 28 Unregistration, 29 }, 30 revenue::RevenueIncomingHistory, 31 wire::{ 32 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, 33 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 34 TransferState, TransferStatus, 35 }, 36 }, 37 db::IncomingType, 38 error_code::ErrorCode, 39 types::{amount::Currency, payto::PaytoURI, timestamp::TalerTimestamp, utils::date_to_utc_ts}, 40 }; 41 use tokio::sync::watch::Sender; 42 43 use crate::{ 44 FullHuPayto, 45 constants::CURR, 46 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 47 }; 48 49 pub struct MagnetApi { 50 pub pool: sqlx::PgPool, 51 pub payto: PaytoURI, 52 pub in_channel: Sender<i64>, 53 pub taler_in_channel: Sender<i64>, 54 pub out_channel: Sender<i64>, 55 pub taler_out_channel: Sender<i64>, 56 } 57 58 impl MagnetApi { 59 pub async fn start(pool: sqlx::PgPool, payto: PaytoURI) -> Self { 60 let in_channel = Sender::new(0); 61 let taler_in_channel = Sender::new(0); 62 let out_channel = Sender::new(0); 63 let taler_out_channel = Sender::new(0); 64 let tmp = Self { 65 pool: pool.clone(), 66 payto, 67 in_channel: in_channel.clone(), 68 taler_in_channel: taler_in_channel.clone(), 69 out_channel: out_channel.clone(), 70 taler_out_channel: taler_out_channel.clone(), 71 }; 72 tokio::spawn(db::notification_listener( 73 pool, 74 in_channel, 75 taler_in_channel, 76 out_channel, 77 taler_out_channel, 78 )); 79 tmp 80 } 81 } 82 83 impl TalerApi for MagnetApi { 84 fn currency(&self) -> Currency { 85 CURR 86 } 87 88 fn implementation(&self) -> &'static str { 89 "urn:net:taler:specs:taler-magnet-bank:taler-rust" 90 } 91 } 92 93 impl WireGateway for MagnetApi { 94 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 95 let creditor = FullHuPayto::try_from(&req.credit_account)?; 96 let result = db::make_transfer( 97 &self.pool, 98 &Transfer { 99 request_uid: req.request_uid, 100 wtid: req.wtid, 101 amount: req.amount.decimal(), 102 metadata: req.metadata, 103 creditor, 104 exchange_base_url: req.exchange_base_url, 105 }, 106 &Timestamp::now(), 107 ) 108 .await?; 109 match result { 110 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 111 timestamp: initiated_at.into(), 112 row_id: id, 113 }), 114 db::TransferResult::RequestUidReuse => { 115 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 116 } 117 db::TransferResult::WtidReuse => { 118 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 119 } 120 } 121 } 122 123 async fn transfer_page( 124 &self, 125 page: Page, 126 status: Option<TransferState>, 127 ) -> ApiResult<TransferList> { 128 Ok(TransferList { 129 transfers: db::transfer_page(&self.pool, &status, &page).await?, 130 debit_account: self.payto.clone(), 131 }) 132 } 133 134 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 135 Ok(db::transfer_by_id(&self.pool, id).await?) 136 } 137 138 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 139 Ok(OutgoingHistory { 140 outgoing_transactions: db::outgoing_history(&self.pool, ¶ms, || { 141 self.taler_out_channel.subscribe() 142 }) 143 .await?, 144 debit_account: self.payto.clone(), 145 }) 146 } 147 148 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 149 Ok(IncomingHistory { 150 incoming_transactions: db::incoming_history(&self.pool, ¶ms, || { 151 self.taler_in_channel.subscribe() 152 }) 153 .await?, 154 credit_account: self.payto.clone(), 155 }) 156 } 157 158 async fn add_incoming_reserve( 159 &self, 160 req: AddIncomingRequest, 161 ) -> ApiResult<AddIncomingResponse> { 162 let debtor = FullHuPayto::try_from(&req.debit_account)?; 163 let res = db::register_tx_in_admin( 164 &self.pool, 165 &TxInAdmin { 166 amount: req.amount, 167 subject: format!("Admin incoming {}", req.reserve_pub), 168 debtor, 169 metadata: IncomingSubject::Reserve(req.reserve_pub), 170 }, 171 &Timestamp::now(), 172 ) 173 .await?; 174 match res { 175 AddIncomingResult::Success { 176 row_id, valued_at, .. 177 } => Ok(AddIncomingResponse { 178 row_id, 179 timestamp: date_to_utc_ts(&valued_at).into(), 180 }), 181 AddIncomingResult::ReservePubReuse => { 182 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 183 } 184 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 185 unreachable!("mapping not used") 186 } 187 } 188 } 189 190 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { 191 let debtor = FullHuPayto::try_from(&req.debit_account)?; 192 let res = db::register_tx_in_admin( 193 &self.pool, 194 &TxInAdmin { 195 amount: req.amount, 196 subject: format!("Admin incoming KYC:{}", req.account_pub), 197 debtor, 198 metadata: IncomingSubject::Kyc(req.account_pub), 199 }, 200 &Timestamp::now(), 201 ) 202 .await?; 203 match res { 204 AddIncomingResult::Success { 205 row_id, valued_at, .. 206 } => Ok(AddIncomingResponse { 207 row_id, 208 timestamp: date_to_utc_ts(&valued_at).into(), 209 }), 210 AddIncomingResult::ReservePubReuse => unreachable!("kyc"), 211 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 212 unreachable!("mapping not used") 213 } 214 } 215 } 216 217 async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { 218 let debtor = FullHuPayto::try_from(&req.debit_account)?; 219 let res = db::register_tx_in_admin( 220 &self.pool, 221 &TxInAdmin { 222 amount: req.amount, 223 subject: format!("Admin incoming MAP:{}", req.authorization_pub), 224 debtor, 225 metadata: IncomingSubject::Map(req.authorization_pub), 226 }, 227 &Timestamp::now(), 228 ) 229 .await?; 230 match res { 231 AddIncomingResult::Success { 232 row_id, valued_at, .. 233 } => Ok(AddIncomingResponse { 234 row_id, 235 timestamp: date_to_utc_ts(&valued_at).into(), 236 }), 237 AddIncomingResult::ReservePubReuse => { 238 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 239 } 240 AddIncomingResult::UnknownMapping => { 241 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) 242 } 243 AddIncomingResult::MappingReuse => { 244 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) 245 } 246 } 247 } 248 249 fn support_account_check(&self) -> bool { 250 false 251 } 252 } 253 254 impl Revenue for MagnetApi { 255 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 256 Ok(RevenueIncomingHistory { 257 incoming_transactions: db::revenue_history(&self.pool, ¶ms, || { 258 self.in_channel.subscribe() 259 }) 260 .await?, 261 credit_account: self.payto.clone(), 262 }) 263 } 264 } 265 266 impl PreparedTransfer for MagnetApi { 267 fn supported_formats(&self) -> &[SubjectFormat] { 268 &[SubjectFormat::SIMPLE] 269 } 270 271 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 272 match db::transfer_register(&self.pool, &req).await? { 273 db::RegistrationResult::Success => { 274 let simple = TransferSubject::Simple { 275 credit_amount: req.credit_amount, 276 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 277 fmt_in_subject(req.r#type.into(), &req.account_pub).to_string() 278 } else { 279 fmt_in_subject(IncomingType::map, &req.authorization_pub).to_string() 280 }, 281 }; 282 ApiResult::Ok(RegistrationResponse { 283 subjects: vec![simple], 284 expiration: TalerTimestamp::Never, 285 }) 286 } 287 db::RegistrationResult::ReservePubReuse => { 288 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 289 } 290 } 291 } 292 293 async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> { 294 Ok(db::transfer_unregister(&self.pool, &req).await?) 295 } 296 } 297 298 #[cfg(test)] 299 mod test { 300 301 use std::sync::{ 302 Arc, LazyLock, 303 atomic::{AtomicU64, Ordering}, 304 }; 305 306 use jiff::{Timestamp, Zoned}; 307 use sqlx::{PgPool, Row as _, postgres::PgRow}; 308 use taler_api::{ 309 api::TalerRouter as _, 310 auth::AuthMethod, 311 db::TypeHelper as _, 312 subject::{IncomingSubject, OutgoingSubject}, 313 }; 314 use taler_common::{ 315 api::{ 316 EddsaPublicKey, 317 prepared::PreparedTransferConfig, 318 revenue::RevenueConfig, 319 wire::{TransferState, WireConfig}, 320 }, 321 db::IncomingType, 322 types::{ 323 amount::amount, 324 payto::{PaytoURI, payto}, 325 }, 326 }; 327 use taler_test_utils::{ 328 Router, 329 db::db_test_setup, 330 routine::{ 331 Status, admin_add_incoming_routine, in_history_routine, out_history_routine, 332 registration_routine, revenue_routine, transfer_routine, 333 }, 334 server::TestServer, 335 tasks, 336 }; 337 338 use crate::{ 339 FullHuPayto, 340 api::MagnetApi, 341 constants::CONFIG_SOURCE, 342 db::{self, TxIn, TxOutKind}, 343 magnet_api::types::TxStatus, 344 magnet_payto, 345 }; 346 347 static PAYTO: LazyLock<FullHuPayto> = LazyLock::new(|| { 348 magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name") 349 }); 350 static ACCOUNT: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_uri()); 351 352 async fn setup() -> (Router, PgPool) { 353 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 354 let api = Arc::new(MagnetApi::start(pool.clone(), ACCOUNT.clone()).await); 355 let server = Router::new() 356 .wire_gateway(api.clone(), AuthMethod::None) 357 .prepared_transfer(api.clone()) 358 .revenue(api, AuthMethod::None) 359 .finalize(); 360 361 (server, pool) 362 } 363 364 #[tokio::test] 365 async fn config() { 366 let (server, _) = setup().await; 367 server 368 .get("/taler-wire-gateway/config") 369 .await 370 .assert_ok_json::<WireConfig>(); 371 server 372 .get("/taler-prepared-transfer/config") 373 .await 374 .assert_ok_json::<PreparedTransferConfig>(); 375 server 376 .get("/taler-revenue/config") 377 .await 378 .assert_ok_json::<RevenueConfig>(); 379 } 380 381 #[tokio::test] 382 async fn transfer() { 383 let (server, _) = setup().await; 384 transfer_routine( 385 &server, 386 TransferState::pending, 387 &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 388 ) 389 .await; 390 } 391 392 static CODE: AtomicU64 = AtomicU64::new(0); 393 394 async fn r#in(db: &PgPool, subject: Option<IncomingSubject>) { 395 db::register_tx_in( 396 &mut db.acquire().await.unwrap(), 397 &TxIn { 398 code: CODE.fetch_add(1, Ordering::Relaxed), 399 amount: amount("EUR:10"), 400 subject: "subject".into(), 401 debtor: magnet_payto( 402 "payto://iban/HU30162000031000163100000000?receiver-name=name", 403 ), 404 value_date: Zoned::now().date(), 405 status: TxStatus::Completed, 406 }, 407 &subject, 408 &Timestamp::now(), 409 ) 410 .await 411 .unwrap(); 412 } 413 414 async fn in_malformed(db: &PgPool) { 415 r#in(db, None).await 416 } 417 418 async fn in_talerable(db: &PgPool) { 419 r#in(db, Some(IncomingSubject::Reserve(EddsaPublicKey::rand()))).await 420 } 421 422 async fn out(db: &PgPool, kind: &TxOutKind) { 423 db::register_tx_out( 424 &mut db.acquire().await.unwrap(), 425 &db::TxOut { 426 code: CODE.fetch_add(1, Ordering::Relaxed), 427 amount: amount("EUR:10"), 428 subject: "subject".into(), 429 creditor: PAYTO.clone(), 430 value_date: Zoned::now().date(), 431 status: TxStatus::Completed, 432 }, 433 kind, 434 &Timestamp::now(), 435 ) 436 .await 437 .unwrap(); 438 } 439 440 async fn out_talerable(db: &PgPool) { 441 out(db, &TxOutKind::Talerable(OutgoingSubject::rand())).await 442 } 443 444 async fn out_bounce(db: &PgPool) { 445 out(db, &TxOutKind::Bounce(CODE.load(Ordering::Relaxed) as u32)).await 446 } 447 448 async fn out_malformed(db: &PgPool) { 449 out(db, &TxOutKind::Simple).await 450 } 451 452 #[tokio::test] 453 async fn outgoing_history() { 454 let (server, db) = &setup().await; 455 456 out_history_routine( 457 server, 458 tasks!({ out_talerable(db).await }), 459 tasks!( 460 { out_bounce(db).await }, 461 { out_malformed(db).await }, 462 { in_malformed(db).await }, 463 { in_talerable(db).await } 464 ), 465 ) 466 .await; 467 } 468 469 #[tokio::test] 470 async fn admin_add_incoming() { 471 let (server, _) = setup().await; 472 admin_add_incoming_routine(&server, &ACCOUNT, true).await; 473 } 474 475 #[tokio::test] 476 async fn in_history() { 477 let (server, db) = &setup().await; 478 in_history_routine( 479 server, 480 &ACCOUNT, 481 true, 482 tasks!({ in_talerable(db).await }), 483 tasks!( 484 { out_malformed(db).await }, 485 { out_talerable(db).await }, 486 { out_bounce(db).await }, 487 { in_malformed(db).await } 488 ), 489 ) 490 .await; 491 } 492 493 #[tokio::test] 494 async fn revenue() { 495 let (server, db) = &setup().await; 496 revenue_routine( 497 server, 498 &ACCOUNT, 499 true, 500 tasks!({ in_malformed(db).await }, { in_talerable(db).await },), 501 tasks!({ out_malformed(db).await }, { out_talerable(db).await }, { 502 out_bounce(db).await 503 }), 504 ) 505 .await; 506 } 507 508 async fn check_in(pool: &PgPool) -> Vec<Status> { 509 sqlx::query( 510 " 511 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata 512 FROM tx_in 513 LEFT JOIN taler_in USING (tx_in_id) 514 LEFT JOIN pending_recurrent_in USING (tx_in_id) 515 LEFT JOIN bounced USING (tx_in_id) 516 ORDER BY tx_in.tx_in_id 517 ", 518 ) 519 .try_map(|r: PgRow| { 520 Ok( 521 if r.try_get_flag(0)? { 522 Status::Pending 523 } else if r.try_get_flag(1)? { 524 Status::Bounced 525 } else { 526 match r.try_get(2)? { 527 None => Status::Simple, 528 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 529 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 530 Some(e) => unreachable!("{e:?}") 531 } 532 } 533 ) 534 }) 535 .fetch_all(pool) 536 .await 537 .unwrap() 538 } 539 540 #[tokio::test] 541 async fn registration() { 542 let (server, pool) = setup().await; 543 registration_routine(&server, &ACCOUNT, || check_in(&pool)).await; 544 } 545 }