api.rs (18235B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use compact_str::CompactString; 18 use jiff::Timestamp; 19 use taler_api::{ 20 api::{TalerApi, prepared::PreparedTransfer, revenue::Revenue, wire::WireGateway}, 21 error::{ApiResult, failure_code}, 22 subject::{IncomingSubject, fmt_in_subject}, 23 }; 24 use taler_common::{ 25 api::{ 26 params::{History, Page}, 27 prepared::{ 28 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, 29 Unregistration, 30 }, 31 revenue::RevenueIncomingHistory, 32 wire::{ 33 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, 34 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 35 TransferState, TransferStatus, 36 }, 37 }, 38 db::IncomingType, 39 error_code::ErrorCode, 40 types::{amount::Currency, payto::PaytoURI, timestamp::TalerTimestamp}, 41 }; 42 use tokio::sync::watch::Sender; 43 44 use crate::{ 45 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 46 payto::FullCyclosPayto, 47 }; 48 49 pub struct CyclosApi { 50 pub pool: sqlx::PgPool, 51 pub currency: Currency, 52 pub payto: PaytoURI, 53 pub in_channel: Sender<i64>, 54 pub taler_in_channel: Sender<i64>, 55 pub out_channel: Sender<i64>, 56 pub taler_out_channel: Sender<i64>, 57 pub root: CompactString, 58 } 59 60 impl CyclosApi { 61 pub fn start( 62 pool: sqlx::PgPool, 63 root: CompactString, 64 payto: PaytoURI, 65 currency: Currency, 66 ) -> Self { 67 let in_channel = Sender::new(0); 68 let taler_in_channel = Sender::new(0); 69 let out_channel = Sender::new(0); 70 let taler_out_channel = Sender::new(0); 71 let tmp = Self { 72 pool: pool.clone(), 73 payto, 74 currency, 75 root, 76 in_channel: in_channel.clone(), 77 taler_in_channel: taler_in_channel.clone(), 78 out_channel: out_channel.clone(), 79 taler_out_channel: taler_out_channel.clone(), 80 }; 81 tokio::spawn(db::notification_listener( 82 pool, 83 in_channel, 84 taler_in_channel, 85 out_channel, 86 taler_out_channel, 87 )); 88 tmp 89 } 90 } 91 92 impl TalerApi for CyclosApi { 93 fn currency(&self) -> Currency { 94 self.currency 95 } 96 97 fn implementation(&self) -> &'static str { 98 "urn:net:taler:specs:taler-cyclos:taler-rust" 99 } 100 } 101 102 impl WireGateway for CyclosApi { 103 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 104 let creditor = FullCyclosPayto::try_from(&req.credit_account)?; 105 let result = db::make_transfer( 106 &self.pool, 107 &Transfer { 108 request_uid: req.request_uid, 109 amount: req.amount.decimal(), 110 exchange_base_url: req.exchange_base_url, 111 metadata: req.metadata, 112 wtid: req.wtid, 113 creditor_id: *creditor.id, 114 creditor_name: creditor.name, 115 }, 116 &Timestamp::now(), 117 ) 118 .await?; 119 match result { 120 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 121 timestamp: initiated_at.into(), 122 row_id: id, 123 }), 124 db::TransferResult::RequestUidReuse => { 125 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 126 } 127 db::TransferResult::WtidReuse => { 128 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 129 } 130 } 131 } 132 133 async fn transfer_page( 134 &self, 135 page: Page, 136 status: Option<TransferState>, 137 ) -> ApiResult<TransferList> { 138 Ok(TransferList { 139 transfers: db::transfer_page(&self.pool, &status, &self.currency, &self.root, &page) 140 .await?, 141 debit_account: self.payto.clone(), 142 }) 143 } 144 145 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 146 Ok(db::transfer_by_id(&self.pool, id, &self.currency, &self.root).await?) 147 } 148 149 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 150 Ok(OutgoingHistory { 151 outgoing_transactions: db::outgoing_history( 152 &self.pool, 153 ¶ms, 154 &self.currency, 155 &self.root, 156 || self.taler_out_channel.subscribe(), 157 ) 158 .await?, 159 debit_account: self.payto.clone(), 160 }) 161 } 162 163 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 164 Ok(IncomingHistory { 165 incoming_transactions: db::incoming_history( 166 &self.pool, 167 ¶ms, 168 &self.currency, 169 &self.root, 170 || self.taler_in_channel.subscribe(), 171 ) 172 .await?, 173 credit_account: self.payto.clone(), 174 }) 175 } 176 177 async fn add_incoming_reserve( 178 &self, 179 req: AddIncomingRequest, 180 ) -> ApiResult<AddIncomingResponse> { 181 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 182 let res = db::register_tx_in_admin( 183 &self.pool, 184 &TxInAdmin { 185 amount: req.amount.decimal(), 186 subject: format!("Admin incoming {}", req.reserve_pub), 187 debtor_id: *debtor.id, 188 debtor_name: debtor.name, 189 metadata: IncomingSubject::Reserve(req.reserve_pub), 190 }, 191 &Timestamp::now(), 192 ) 193 .await?; 194 match res { 195 AddIncomingResult::Success { 196 row_id, valued_at, .. 197 } => Ok(AddIncomingResponse { 198 row_id, 199 timestamp: valued_at.into(), 200 }), 201 AddIncomingResult::ReservePubReuse => { 202 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 203 } 204 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 205 unreachable!("mapping unused") 206 } 207 } 208 } 209 210 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { 211 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 212 let res = db::register_tx_in_admin( 213 &self.pool, 214 &TxInAdmin { 215 amount: req.amount.decimal(), 216 subject: format!("Admin incoming KYC:{}", req.account_pub), 217 debtor_id: *debtor.id, 218 debtor_name: debtor.name, 219 metadata: IncomingSubject::Kyc(req.account_pub), 220 }, 221 &Timestamp::now(), 222 ) 223 .await?; 224 match res { 225 AddIncomingResult::Success { 226 row_id, valued_at, .. 227 } => Ok(AddIncomingResponse { 228 row_id, 229 timestamp: valued_at.into(), 230 }), 231 AddIncomingResult::ReservePubReuse => { 232 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 233 } 234 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 235 unreachable!("mapping unused") 236 } 237 } 238 } 239 240 async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { 241 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 242 let res = db::register_tx_in_admin( 243 &self.pool, 244 &TxInAdmin { 245 amount: req.amount.decimal(), 246 subject: format!("Admin incoming MAP:{}", req.authorization_pub), 247 debtor_id: *debtor.id, 248 debtor_name: debtor.name, 249 metadata: IncomingSubject::Map(req.authorization_pub), 250 }, 251 &Timestamp::now(), 252 ) 253 .await?; 254 match res { 255 AddIncomingResult::Success { 256 row_id, valued_at, .. 257 } => Ok(AddIncomingResponse { 258 row_id, 259 timestamp: valued_at.into(), 260 }), 261 AddIncomingResult::ReservePubReuse => { 262 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 263 } 264 AddIncomingResult::UnknownMapping => { 265 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) 266 } 267 AddIncomingResult::MappingReuse => { 268 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) 269 } 270 } 271 } 272 273 fn support_account_check(&self) -> bool { 274 false 275 } 276 } 277 278 impl Revenue for CyclosApi { 279 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 280 Ok(RevenueIncomingHistory { 281 incoming_transactions: db::revenue_history( 282 &self.pool, 283 ¶ms, 284 &self.currency, 285 &self.root, 286 || self.in_channel.subscribe(), 287 ) 288 .await?, 289 credit_account: self.payto.clone(), 290 }) 291 } 292 } 293 294 impl PreparedTransfer for CyclosApi { 295 fn supported_formats(&self) -> &[SubjectFormat] { 296 &[SubjectFormat::SIMPLE] 297 } 298 299 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 300 match db::transfer_register(&self.pool, &req).await? { 301 db::RegistrationResult::Success => { 302 let simple = TransferSubject::Simple { 303 credit_amount: req.credit_amount, 304 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 305 fmt_in_subject(req.r#type.into(), &req.account_pub).to_string() 306 } else { 307 fmt_in_subject(IncomingType::map, &req.authorization_pub).to_string() 308 }, 309 }; 310 ApiResult::Ok(RegistrationResponse { 311 subjects: vec![simple], 312 expiration: TalerTimestamp::Never, 313 }) 314 } 315 db::RegistrationResult::ReservePubReuse => { 316 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 317 } 318 } 319 } 320 321 async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> { 322 Ok(db::transfer_unregister(&self.pool, &req).await?) 323 } 324 } 325 326 #[cfg(test)] 327 mod test { 328 use std::sync::{ 329 Arc, LazyLock, 330 atomic::{AtomicI64, Ordering}, 331 }; 332 333 use compact_str::CompactString; 334 use jiff::Timestamp; 335 use sqlx::{PgPool, Row as _, postgres::PgRow}; 336 use taler_api::{ 337 api::TalerRouter as _, 338 auth::AuthMethod, 339 db::TypeHelper as _, 340 subject::{IncomingSubject, OutgoingSubject}, 341 }; 342 use taler_common::{ 343 api::{ 344 EddsaPublicKey, 345 prepared::PreparedTransferConfig, 346 revenue::RevenueConfig, 347 wire::{TransferState, WireConfig}, 348 }, 349 db::IncomingType, 350 types::{ 351 amount::{Currency, decimal}, 352 payto::{PaytoURI, payto}, 353 }, 354 }; 355 use taler_test_utils::{ 356 Router, 357 db::db_test_setup, 358 routine::{ 359 Status, admin_add_incoming_routine, in_history_routine, out_history_routine, 360 registration_routine, revenue_routine, transfer_routine, 361 }, 362 server::TestServer as _, 363 tasks, 364 }; 365 366 use crate::{ 367 api::CyclosApi, 368 constants::CONFIG_SOURCE, 369 db::{self, TxIn, TxOutKind}, 370 }; 371 372 static ACCOUNT: LazyLock<PaytoURI> = 373 LazyLock::new(|| payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name")); 374 375 async fn setup() -> (Router, PgPool) { 376 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 377 let api = Arc::new(CyclosApi::start( 378 pool.clone(), 379 CompactString::const_new("localhost"), 380 ACCOUNT.clone(), 381 Currency::TEST, 382 )); 383 let server = Router::new() 384 .wire_gateway(api.clone(), AuthMethod::None) 385 .prepared_transfer(api.clone()) 386 .revenue(api, AuthMethod::None) 387 .finalize(); 388 389 (server, pool) 390 } 391 392 #[tokio::test] 393 async fn config() { 394 let (server, _) = setup().await; 395 server 396 .get("/taler-wire-gateway/config") 397 .await 398 .assert_ok_json::<WireConfig>(); 399 server 400 .get("/taler-prepared-transfer/config") 401 .await 402 .assert_ok_json::<PreparedTransferConfig>(); 403 server 404 .get("/taler-revenue/config") 405 .await 406 .assert_ok_json::<RevenueConfig>(); 407 } 408 409 #[tokio::test] 410 async fn transfer() { 411 let (server, _) = setup().await; 412 transfer_routine(&server, TransferState::pending, &ACCOUNT).await; 413 } 414 415 static CODE: AtomicI64 = AtomicI64::new(0); 416 417 async fn r#in(db: &PgPool, subject: Option<IncomingSubject>) { 418 let now = Timestamp::now(); 419 db::register_tx_in( 420 &mut db.acquire().await.unwrap(), 421 &TxIn { 422 transfer_id: CODE.fetch_add(1, Ordering::Relaxed), 423 tx_id: None, 424 amount: decimal("10"), 425 subject: "subject".to_owned(), 426 debtor_id: 31000163100000000, 427 debtor_name: "Name".into(), 428 valued_at: Timestamp::now(), 429 }, 430 &subject, 431 &now, 432 ) 433 .await 434 .unwrap(); 435 } 436 437 async fn in_malformed(db: &PgPool) { 438 r#in(db, None).await 439 } 440 441 async fn in_talerable(db: &PgPool) { 442 r#in(db, Some(IncomingSubject::Reserve(EddsaPublicKey::rand()))).await 443 } 444 445 async fn out(db: &PgPool, kind: &TxOutKind) { 446 let i = CODE.fetch_add(1, Ordering::Relaxed); 447 let now = Timestamp::now(); 448 db::register_tx_out( 449 &mut db.acquire().await.unwrap(), 450 &db::TxOut { 451 transfer_id: i, 452 tx_id: if i % 2 == 0 { Some(i % 2) } else { None }, 453 amount: decimal("10"), 454 subject: "subject".to_owned(), 455 creditor_id: 31000163100000000, 456 creditor_name: "Name".into(), 457 valued_at: now, 458 }, 459 kind, 460 &now, 461 ) 462 .await 463 .unwrap(); 464 } 465 466 async fn out_talerable(db: &PgPool) { 467 out(db, &TxOutKind::Talerable(OutgoingSubject::rand())).await 468 } 469 470 async fn out_bounce(db: &PgPool) { 471 out(db, &TxOutKind::Bounce(CODE.load(Ordering::Relaxed))).await 472 } 473 474 async fn out_malformed(db: &PgPool) { 475 out(db, &TxOutKind::Simple).await 476 } 477 478 #[tokio::test] 479 async fn outgoing_history() { 480 let (server, db) = &setup().await; 481 482 out_history_routine( 483 server, 484 tasks!({ out_talerable(db).await }), 485 tasks!( 486 { out_bounce(db).await }, 487 { out_malformed(db).await }, 488 { in_malformed(db).await }, 489 { in_talerable(db).await } 490 ), 491 ) 492 .await; 493 } 494 495 #[tokio::test] 496 async fn admin_add_incoming() { 497 let (server, _) = setup().await; 498 admin_add_incoming_routine(&server, &ACCOUNT, true).await; 499 } 500 501 #[tokio::test] 502 async fn in_history() { 503 let (server, db) = &setup().await; 504 in_history_routine( 505 server, 506 &ACCOUNT, 507 true, 508 tasks!({ in_talerable(db).await }), 509 tasks!( 510 { out_malformed(db).await }, 511 { out_talerable(db).await }, 512 { out_bounce(db).await }, 513 { in_malformed(db).await } 514 ), 515 ) 516 .await; 517 } 518 519 #[tokio::test] 520 async fn revenue() { 521 let (server, db) = &setup().await; 522 revenue_routine( 523 server, 524 &ACCOUNT, 525 true, 526 tasks!({ in_malformed(db).await }, { in_talerable(db).await },), 527 tasks!({ out_malformed(db).await }, { out_talerable(db).await }, { 528 out_bounce(db).await 529 }), 530 ) 531 .await; 532 } 533 534 async fn check_in(pool: &PgPool) -> Vec<Status> { 535 sqlx::query( 536 " 537 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata 538 FROM tx_in 539 LEFT JOIN taler_in USING (tx_in_id) 540 LEFT JOIN pending_recurrent_in USING (tx_in_id) 541 LEFT JOIN bounced USING (tx_in_id) 542 ORDER BY tx_in.tx_in_id 543 ", 544 ) 545 .try_map(|r: PgRow| { 546 Ok( 547 if r.try_get_flag(0)? { 548 Status::Pending 549 } else if r.try_get_flag(1)? { 550 Status::Bounced 551 } else { 552 match r.try_get(2)? { 553 None => Status::Simple, 554 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 555 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 556 Some(e) => unreachable!("{e:?}") 557 } 558 } 559 ) 560 }) 561 .fetch_all(pool) 562 .await 563 .unwrap() 564 } 565 566 #[tokio::test] 567 async fn registration() { 568 let (server, pool) = setup().await; 569 registration_routine(&server, &ACCOUNT, || check_in(&pool)).await; 570 } 571 }