api.rs (16766B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use compact_str::CompactString; 18 use jiff::Timestamp; 19 use taler_api::{ 20 api::{TalerApi, revenue::Revenue, transfer::WireTransferGateway, wire::WireGateway}, 21 error::{ApiResult, failure, failure_code}, 22 subject::{IncomingSubject, fmt_in_subject}, 23 }; 24 use taler_common::{ 25 api_common::{SafeU64, safe_u64}, 26 api_params::{History, Page}, 27 api_revenue::RevenueIncomingHistory, 28 api_transfer::{ 29 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration, 30 }, 31 api_wire::{ 32 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, 33 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 34 TransferState, TransferStatus, 35 }, 36 db::IncomingType, 37 error_code::ErrorCode, 38 types::{amount::Currency, payto::PaytoURI, timestamp::TalerTimestamp}, 39 }; 40 use tokio::sync::watch::Sender; 41 42 use crate::{ 43 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 44 payto::FullCyclosPayto, 45 }; 46 47 pub struct CyclosApi { 48 pub pool: sqlx::PgPool, 49 pub currency: Currency, 50 pub payto: PaytoURI, 51 pub in_channel: Sender<i64>, 52 pub taler_in_channel: Sender<i64>, 53 pub out_channel: Sender<i64>, 54 pub taler_out_channel: Sender<i64>, 55 pub root: CompactString, 56 } 57 58 impl CyclosApi { 59 pub async fn start( 60 pool: sqlx::PgPool, 61 root: CompactString, 62 payto: PaytoURI, 63 currency: Currency, 64 ) -> Self { 65 let in_channel = Sender::new(0); 66 let taler_in_channel = Sender::new(0); 67 let out_channel = Sender::new(0); 68 let taler_out_channel = Sender::new(0); 69 let tmp = Self { 70 pool: pool.clone(), 71 payto, 72 currency, 73 root, 74 in_channel: in_channel.clone(), 75 taler_in_channel: taler_in_channel.clone(), 76 out_channel: out_channel.clone(), 77 taler_out_channel: taler_out_channel.clone(), 78 }; 79 tokio::spawn(db::notification_listener( 80 pool, 81 in_channel, 82 taler_in_channel, 83 out_channel, 84 taler_out_channel, 85 )); 86 tmp 87 } 88 } 89 90 impl TalerApi for CyclosApi { 91 fn currency(&self) -> &str { 92 self.currency.as_ref() 93 } 94 95 fn implementation(&self) -> &'static str { 96 "urn:net:taler:specs:taler-cyclos:taler-rust" 97 } 98 } 99 100 impl WireGateway for CyclosApi { 101 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 102 let creditor = FullCyclosPayto::try_from(&req.credit_account)?; 103 let result = db::make_transfer( 104 &self.pool, 105 &Transfer { 106 request_uid: req.request_uid, 107 amount: req.amount.decimal(), 108 exchange_base_url: req.exchange_base_url, 109 metadata: req.metadata, 110 wtid: req.wtid, 111 creditor_id: *creditor.id, 112 creditor_name: creditor.name, 113 }, 114 &Timestamp::now(), 115 ) 116 .await?; 117 match result { 118 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 119 timestamp: initiated_at.into(), 120 row_id: SafeU64::try_from(id).unwrap(), 121 }), 122 db::TransferResult::RequestUidReuse => { 123 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 124 } 125 db::TransferResult::WtidReuse => { 126 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 127 } 128 } 129 } 130 131 async fn transfer_page( 132 &self, 133 page: Page, 134 status: Option<TransferState>, 135 ) -> ApiResult<TransferList> { 136 Ok(TransferList { 137 transfers: db::transfer_page(&self.pool, &status, &self.currency, &self.root, &page) 138 .await?, 139 debit_account: self.payto.clone(), 140 }) 141 } 142 143 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 144 Ok(db::transfer_by_id(&self.pool, id, &self.currency, &self.root).await?) 145 } 146 147 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 148 Ok(OutgoingHistory { 149 outgoing_transactions: db::outgoing_history( 150 &self.pool, 151 ¶ms, 152 &self.currency, 153 &self.root, 154 || self.taler_out_channel.subscribe(), 155 ) 156 .await?, 157 debit_account: self.payto.clone(), 158 }) 159 } 160 161 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 162 Ok(IncomingHistory { 163 incoming_transactions: db::incoming_history( 164 &self.pool, 165 ¶ms, 166 &self.currency, 167 &self.root, 168 || self.taler_in_channel.subscribe(), 169 ) 170 .await?, 171 credit_account: self.payto.clone(), 172 }) 173 } 174 175 async fn add_incoming_reserve( 176 &self, 177 req: AddIncomingRequest, 178 ) -> ApiResult<AddIncomingResponse> { 179 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 180 let res = db::register_tx_in_admin( 181 &self.pool, 182 &TxInAdmin { 183 amount: req.amount.decimal(), 184 subject: format!("Admin incoming {}", req.reserve_pub), 185 debtor_id: *debtor.id, 186 debtor_name: debtor.name, 187 metadata: IncomingSubject::Reserve(req.reserve_pub), 188 }, 189 &Timestamp::now(), 190 ) 191 .await?; 192 match res { 193 AddIncomingResult::Success { 194 row_id, valued_at, .. 195 } => Ok(AddIncomingResponse { 196 row_id: safe_u64(row_id), 197 timestamp: valued_at.into(), 198 }), 199 AddIncomingResult::ReservePubReuse => { 200 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 201 } 202 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 203 unreachable!("mapping unused") 204 } 205 } 206 } 207 208 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { 209 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 210 let res = db::register_tx_in_admin( 211 &self.pool, 212 &TxInAdmin { 213 amount: req.amount.decimal(), 214 subject: format!("Admin incoming KYC:{}", req.account_pub), 215 debtor_id: *debtor.id, 216 debtor_name: debtor.name, 217 metadata: IncomingSubject::Kyc(req.account_pub), 218 }, 219 &Timestamp::now(), 220 ) 221 .await?; 222 match res { 223 AddIncomingResult::Success { 224 row_id, valued_at, .. 225 } => Ok(AddKycauthResponse { 226 row_id: safe_u64(row_id), 227 timestamp: valued_at.into(), 228 }), 229 AddIncomingResult::ReservePubReuse => { 230 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 231 } 232 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 233 unreachable!("mapping unused") 234 } 235 } 236 } 237 238 fn support_account_check(&self) -> bool { 239 false 240 } 241 } 242 243 impl Revenue for CyclosApi { 244 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 245 Ok(RevenueIncomingHistory { 246 incoming_transactions: db::revenue_history( 247 &self.pool, 248 ¶ms, 249 &self.currency, 250 &self.root, 251 || self.in_channel.subscribe(), 252 ) 253 .await?, 254 credit_account: self.payto.clone(), 255 }) 256 } 257 } 258 259 impl WireTransferGateway for CyclosApi { 260 fn supported_formats(&self) -> &[SubjectFormat] { 261 &[SubjectFormat::SIMPLE] 262 } 263 264 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 265 match db::transfer_register(&self.pool, &req).await? { 266 db::RegistrationResult::Success => { 267 let simple = TransferSubject::Simple { 268 credit_amount: req.credit_amount, 269 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 270 fmt_in_subject(req.r#type.into(), &req.account_pub) 271 } else { 272 fmt_in_subject(IncomingType::map, &req.authorization_pub) 273 }, 274 }; 275 ApiResult::Ok(RegistrationResponse { 276 subjects: vec![simple], 277 expiration: TalerTimestamp::Never, 278 }) 279 } 280 db::RegistrationResult::ReservePubReuse => { 281 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 282 } 283 } 284 } 285 286 async fn unregistration(&self, req: Unregistration) -> ApiResult<()> { 287 if !db::transfer_unregister(&self.pool, &req).await? { 288 Err(failure( 289 ErrorCode::BANK_TRANSACTION_NOT_FOUND, 290 format!("Prepared transfer '{}' not found", req.authorization_pub), 291 )) 292 } else { 293 Ok(()) 294 } 295 } 296 } 297 298 #[cfg(test)] 299 mod test { 300 use std::{ 301 str::FromStr as _, 302 sync::{Arc, LazyLock}, 303 }; 304 305 use compact_str::CompactString; 306 use jiff::Timestamp; 307 use sqlx::{PgPool, Row as _, postgres::PgRow}; 308 use taler_api::{ 309 api::TalerRouter as _, 310 auth::AuthMethod, 311 db::TypeHelper as _, 312 subject::{IncomingSubject, OutgoingSubject}, 313 }; 314 use taler_common::{ 315 api_common::EddsaPublicKey, 316 api_revenue::RevenueConfig, 317 api_transfer::WireTransferConfig, 318 api_wire::{OutgoingHistory, TransferState, WireConfig}, 319 db::IncomingType, 320 types::{ 321 amount::{Currency, decimal}, 322 payto::{PaytoURI, payto}, 323 }, 324 }; 325 use taler_test_utils::{ 326 Router, 327 db::db_test_setup, 328 routine::{ 329 Status, admin_add_incoming_routine, registration_routine, revenue_routine, 330 routine_pagination, transfer_routine, 331 }, 332 server::TestServer as _, 333 }; 334 335 use crate::{ 336 api::CyclosApi, 337 constants::CONFIG_SOURCE, 338 db::{self, AddIncomingResult, TxIn, TxOutKind}, 339 }; 340 341 static ACCOUNT: LazyLock<PaytoURI> = 342 LazyLock::new(|| payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name")); 343 344 async fn setup() -> (Router, PgPool) { 345 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 346 let api = Arc::new( 347 CyclosApi::start( 348 pool.clone(), 349 CompactString::const_new("localhost"), 350 ACCOUNT.clone(), 351 Currency::from_str("TEST").unwrap(), 352 ) 353 .await, 354 ); 355 let server = Router::new() 356 .wire_gateway(api.clone(), AuthMethod::None) 357 .wire_transfer_gateway(api.clone()) 358 .revenue(api, AuthMethod::None) 359 .finalize(); 360 361 (server, pool) 362 } 363 364 #[tokio::test] 365 async fn config() { 366 let (server, _) = setup().await; 367 server 368 .get("/taler-wire-gateway/config") 369 .await 370 .assert_ok_json::<WireConfig>(); 371 server 372 .get("/taler-wire-transfer-gateway/config") 373 .await 374 .assert_ok_json::<WireTransferConfig>(); 375 server 376 .get("/taler-revenue/config") 377 .await 378 .assert_ok_json::<RevenueConfig>(); 379 } 380 381 #[tokio::test] 382 async fn transfer() { 383 let (server, _) = setup().await; 384 transfer_routine(&server, TransferState::pending, &ACCOUNT).await; 385 } 386 387 #[tokio::test] 388 async fn outgoing_history() { 389 let (server, pool) = setup().await; 390 routine_pagination::<OutgoingHistory, _>( 391 &server, 392 "/taler-wire-gateway/history/outgoing", 393 async |i| { 394 db::register_tx_out( 395 &mut pool.acquire().await.unwrap(), 396 &db::TxOut { 397 transfer_id: i as i64, 398 tx_id: if i % 2 == 0 { 399 Some((i % 2) as i64) 400 } else { 401 None 402 }, 403 amount: decimal("10"), 404 subject: "subject".to_owned(), 405 creditor_id: 31000163100000000, 406 creditor_name: "Name".into(), 407 valued_at: Timestamp::now(), 408 }, 409 &TxOutKind::Talerable(OutgoingSubject::rand()), 410 &Timestamp::now(), 411 ) 412 .await 413 .unwrap(); 414 }, 415 ) 416 .await; 417 } 418 419 #[tokio::test] 420 async fn admin_add_incoming() { 421 let (server, _) = setup().await; 422 admin_add_incoming_routine(&server, &ACCOUNT, true).await; 423 } 424 425 #[tokio::test] 426 async fn revenue() { 427 let (server, _) = setup().await; 428 revenue_routine(&server, &ACCOUNT, true).await; 429 } 430 431 async fn check_in(pool: &PgPool) -> Vec<Status> { 432 sqlx::query( 433 " 434 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata 435 FROM tx_in 436 LEFT JOIN taler_in USING (tx_in_id) 437 LEFT JOIN pending_recurrent_in USING (tx_in_id) 438 LEFT JOIN bounced USING (tx_in_id) 439 ORDER BY tx_in.tx_in_id 440 ", 441 ) 442 .try_map(|r: PgRow| { 443 Ok( 444 if r.try_get_flag(0)? { 445 Status::Pending 446 } else if r.try_get_flag(1)? { 447 Status::Bounced 448 } else { 449 match r.try_get(2)? { 450 None => Status::Simple, 451 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 452 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 453 Some(e) => unreachable!("{e:?}") 454 } 455 } 456 ) 457 }) 458 .fetch_all(pool) 459 .await 460 .unwrap() 461 } 462 463 pub async fn test_in(pool: &PgPool, key: EddsaPublicKey) { 464 let tx = TxIn { 465 transfer_id: rand::random_range(10..10000), 466 tx_id: None, 467 amount: decimal("12"), 468 subject: String::new(), 469 debtor_id: rand::random_range(10..10000), 470 debtor_name: "Name".into(), 471 valued_at: Timestamp::now(), 472 }; 473 let mut db = pool.acquire().await.unwrap(); 474 let reason = match db::register_tx_in( 475 &mut db, 476 &tx, 477 &Some(IncomingSubject::Map(key)), 478 &Timestamp::now(), 479 ) 480 .await 481 .unwrap() 482 { 483 AddIncomingResult::Success { .. } => return, 484 AddIncomingResult::ReservePubReuse => "reserve pub reuse", 485 AddIncomingResult::UnknownMapping => "unknown mapping", 486 AddIncomingResult::MappingReuse => "mapping reuse", 487 }; 488 db::register_bounced_tx_in( 489 &mut db, 490 &tx, 491 rand::random_range(10..10000), 492 reason, 493 &Timestamp::now(), 494 ) 495 .await 496 .unwrap(); 497 } 498 499 #[tokio::test] 500 async fn registration() { 501 let (server, pool) = setup().await; 502 registration_routine( 503 &server, 504 &ACCOUNT, 505 || check_in(&pool), 506 |account_pub| { 507 let account_pub = account_pub.clone(); 508 let pool = &pool; 509 async move { test_in(pool, account_pub).await } 510 }, 511 ) 512 .await; 513 } 514 }