db.rs (12593B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2024, 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow}; 19 use taler_common::{ 20 api::{ 21 EddsaPublicKey, 22 params::{History, Page}, 23 prepared::{RegistrationRequest, Unregistration}, 24 revenue::RevenueIncomingBankTransaction, 25 wire::{ 26 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 27 TransferResponse, TransferState, TransferStatus, 28 }, 29 }, 30 db::IncomingType, 31 types::{ 32 amount::{Amount, Currency}, 33 payto::PaytoURI, 34 }, 35 }; 36 use tokio::sync::watch::{Receiver, Sender}; 37 38 use crate::{ 39 db::{BindHelper as _, TypeHelper as _, history, page}, 40 notification_listener, serialized, 41 subject::fmt_out_subject, 42 }; 43 44 pub async fn notification_listener( 45 pool: PgPool, 46 outgoing_channel: Sender<i64>, 47 incoming_channel: Sender<i64>, 48 ) { 49 notification_listener!(&pool, 50 "outgoing_tx" => (row_id: i64) { 51 outgoing_channel.send_replace(row_id); 52 }, 53 "incoming_tx" => (row_id: i64) { 54 incoming_channel.send_replace(row_id); 55 } 56 ) 57 } 58 59 pub enum TransferResult { 60 Success(TransferResponse), 61 RequestUidReuse, 62 WtidReuse, 63 } 64 65 pub async fn transfer(db: &PgPool, req: &TransferRequest) -> sqlx::Result<TransferResult> { 66 let subject = fmt_out_subject(&req.wtid, &req.exchange_base_url, req.metadata.as_deref()); 67 serialized!( 68 sqlx::query( 69 " 70 SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at 71 FROM taler_transfer($1,$2,$3,$4,$5,$6,$7,$8) 72 ", 73 ) 74 .bind(req.amount) 75 .bind(req.exchange_base_url.as_str()) 76 .bind(&req.metadata) 77 .bind(&subject) 78 .bind(req.credit_account.raw()) 79 .bind(&req.request_uid) 80 .bind(&req.wtid) 81 .bind_timestamp(&Timestamp::now()) 82 .try_map(|r: PgRow| { 83 Ok(if r.try_get_flag("out_request_uid_reuse")? { 84 TransferResult::RequestUidReuse 85 } else if r.try_get_flag("out_wtid_reuse")? { 86 TransferResult::WtidReuse 87 } else { 88 TransferResult::Success(TransferResponse { 89 row_id: r.try_get_u64("out_transfer_row_id")?, 90 timestamp: r.try_get_timestamp("out_created_at")?.into(), 91 }) 92 }) 93 }) 94 .fetch_one(db) 95 ) 96 } 97 98 pub async fn transfer_page( 99 db: &PgPool, 100 status: &Option<TransferState>, 101 params: &Page, 102 currency: &Currency, 103 ) -> sqlx::Result<Vec<TransferListStatus>> { 104 page( 105 db, 106 params, 107 "transfer_id", 108 || { 109 let mut builder = QueryBuilder::new( 110 " 111 SELECT 112 transfer_id, 113 status, 114 amount, 115 credit_payto, 116 created_at 117 FROM transfer 118 JOIN tx_out USING (tx_out_id) 119 WHERE 120 ", 121 ); 122 if let Some(status) = status { 123 builder.push(" status = ").push_bind(status).push(" AND "); 124 } 125 builder 126 }, 127 |r: PgRow| { 128 Ok(TransferListStatus { 129 row_id: r.try_get_u64("transfer_id")?, 130 status: r.try_get("status")?, 131 amount: r.try_get_amount("amount", currency)?, 132 credit_account: r.try_get_payto("credit_payto")?, 133 timestamp: r.try_get_timestamp("created_at")?.into(), 134 }) 135 }, 136 ) 137 .await 138 } 139 140 pub async fn transfer_by_id( 141 db: &PgPool, 142 id: u64, 143 currency: &Currency, 144 ) -> sqlx::Result<Option<TransferStatus>> { 145 serialized!( 146 sqlx::query( 147 " 148 SELECT 149 status, 150 status_msg, 151 amount, 152 exchange_base_url, 153 metadata, 154 wtid, 155 credit_payto, 156 created_at 157 FROM transfer 158 JOIN tx_out USING (tx_out_id) 159 WHERE transfer_id = $1 160 ", 161 ) 162 .bind(id as i64) 163 .try_map(|r: PgRow| { 164 Ok(TransferStatus { 165 status: r.try_get("status")?, 166 status_msg: r.try_get("status_msg")?, 167 amount: r.try_get_amount("amount", currency)?, 168 origin_exchange_url: r.try_get("exchange_base_url")?, 169 metadata: r.try_get("metadata")?, 170 wtid: r.try_get("wtid")?, 171 credit_account: r.try_get_payto("credit_payto")?, 172 timestamp: r.try_get_timestamp("created_at")?.into(), 173 }) 174 }) 175 .fetch_optional(db) 176 ) 177 } 178 179 pub async fn outgoing_revenue( 180 db: &PgPool, 181 params: &History, 182 currency: &Currency, 183 listen: impl FnOnce() -> Receiver<i64>, 184 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 185 history( 186 db, 187 "transfer_id", 188 params, 189 listen, 190 || { 191 QueryBuilder::new( 192 " 193 SELECT 194 transfer_id, 195 amount, 196 exchange_base_url, 197 metadata, 198 wtid, 199 credit_payto, 200 created_at 201 FROM transfer 202 JOIN tx_out USING (tx_out_id) 203 WHERE status = 'success' AND 204 ", 205 ) 206 }, 207 |r| { 208 Ok(OutgoingBankTransaction { 209 amount: r.try_get_amount("amount", currency)?, 210 debit_fee: None, 211 wtid: r.try_get("wtid")?, 212 credit_account: r.try_get_payto("credit_payto")?, 213 row_id: r.try_get_u64("transfer_id")?, 214 date: r.try_get_timestamp("created_at")?.into(), 215 exchange_base_url: r.try_get_url("exchange_base_url")?, 216 metadata: r.try_get("metadata")?, 217 }) 218 }, 219 ) 220 .await 221 } 222 223 pub enum AddIncomingResult { 224 Success { id: u64, created_at: Timestamp }, 225 ReservePubReuse, 226 UnknownMapping, 227 MappingReuse, 228 } 229 230 pub async fn add_incoming( 231 db: &PgPool, 232 amount: &Amount, 233 debit_account: &PaytoURI, 234 subject: &str, 235 timestamp: &Timestamp, 236 ty: IncomingType, 237 account_pub: &EddsaPublicKey, 238 ) -> sqlx::Result<AddIncomingResult> { 239 serialized!( 240 sqlx::query( 241 "SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_created_at 242 FROM add_incoming($1,$2,$3,$4,$5,$6)", 243 ) 244 .bind(amount) 245 .bind(subject) 246 .bind(debit_account.raw()) 247 .bind(ty) 248 .bind(account_pub) 249 .bind_timestamp(timestamp) 250 .try_map(|r: PgRow| { 251 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 252 AddIncomingResult::ReservePubReuse 253 } else if r.try_get_flag("out_mapping_reuse")? { 254 AddIncomingResult::MappingReuse 255 } else if r.try_get_flag("out_unknown_mapping")? { 256 AddIncomingResult::UnknownMapping 257 } else{ 258 AddIncomingResult::Success { 259 id: r.try_get_u64("out_tx_row_id")?, 260 created_at: r.try_get_timestamp("out_created_at")?, 261 } 262 }) 263 }) 264 .fetch_one(db) 265 ) 266 } 267 268 pub async fn incoming_history( 269 db: &PgPool, 270 params: &History, 271 currency: &Currency, 272 listen: impl FnOnce() -> Receiver<i64>, 273 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 274 history( 275 db, 276 "tx_in_id", 277 params, 278 listen, 279 || { 280 QueryBuilder::new( 281 " 282 SELECT 283 type, 284 tx_in_id, 285 amount, 286 created_at, 287 debit_payto, 288 account_pub, 289 authorization_pub, 290 authorization_sig 291 FROM tx_in 292 JOIN taler_in USING (tx_in_id) 293 WHERE 294 ", 295 ) 296 }, 297 |r: PgRow| { 298 Ok(match r.try_get("type")? { 299 IncomingType::reserve => IncomingBankTransaction::Reserve { 300 row_id: r.try_get_u64("tx_in_id")?, 301 date: r.try_get_timestamp("created_at")?.into(), 302 amount: r.try_get_amount("amount", currency)?, 303 credit_fee: None, 304 debit_account: r.try_get_payto("debit_payto")?, 305 reserve_pub: r.try_get("account_pub")?, 306 authorization_pub: r.try_get("authorization_pub")?, 307 authorization_sig: r.try_get("authorization_sig")?, 308 }, 309 IncomingType::kyc => IncomingBankTransaction::Kyc { 310 row_id: r.try_get_u64("tx_in_id")?, 311 date: r.try_get_timestamp("created_at")?.into(), 312 amount: r.try_get_amount("amount", currency)?, 313 credit_fee: None, 314 debit_account: r.try_get_payto("debit_payto")?, 315 account_pub: r.try_get("account_pub")?, 316 authorization_pub: r.try_get("authorization_pub")?, 317 authorization_sig: r.try_get("authorization_sig")?, 318 }, 319 IncomingType::map => unimplemented!("MAP are never listed in the history"), 320 }) 321 }, 322 ) 323 .await 324 } 325 326 pub async fn revenue_history( 327 db: &PgPool, 328 params: &History, 329 currency: &Currency, 330 listen: impl FnOnce() -> Receiver<i64>, 331 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 332 history( 333 db, 334 "tx_in_id", 335 params, 336 listen, 337 || { 338 QueryBuilder::new( 339 " 340 SELECT 341 tx_in_id, 342 amount, 343 created_at, 344 debit_payto, 345 subject 346 FROM tx_in 347 WHERE 348 ", 349 ) 350 }, 351 |r: PgRow| { 352 Ok(RevenueIncomingBankTransaction { 353 row_id: r.try_get_u64("tx_in_id")?, 354 date: r.try_get_timestamp("created_at")?.into(), 355 amount: r.try_get_amount("amount", currency)?, 356 credit_fee: None, 357 debit_account: r.try_get_payto("debit_payto")?, 358 subject: r.try_get("subject")?, 359 }) 360 }, 361 ) 362 .await 363 } 364 365 pub enum RegistrationResult { 366 Success, 367 ReservePubReuse, 368 } 369 370 pub async fn transfer_register( 371 db: &PgPool, 372 req: &RegistrationRequest, 373 ) -> sqlx::Result<RegistrationResult> { 374 let ty: IncomingType = req.r#type.into(); 375 serialized!( 376 sqlx::query( 377 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 378 ) 379 .bind(ty) 380 .bind(&req.account_pub) 381 .bind(&req.authorization_pub) 382 .bind(&req.authorization_sig) 383 .bind(req.recurrent) 384 .bind_timestamp(&Timestamp::now()) 385 .try_map(|r: PgRow| { 386 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 387 RegistrationResult::ReservePubReuse 388 } else { 389 RegistrationResult::Success 390 }) 391 }) 392 .fetch_one(db) 393 ) 394 } 395 396 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 397 serialized!( 398 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") 399 .bind(&req.authorization_pub) 400 .bind_timestamp(&Timestamp::now()) 401 .try_map(|r: PgRow| r.try_get_flag("out_found")) 402 .fetch_one(db) 403 ) 404 }