db.rs (12634B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2024, 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow}; 19 use taler_common::{ 20 api_common::{EddsaPublicKey, SafeU64}, 21 api_params::{History, Page}, 22 api_revenue::RevenueIncomingBankTransaction, 23 api_transfer::{RegistrationRequest, Unregistration}, 24 api_wire::{ 25 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 26 TransferResponse, TransferState, TransferStatus, 27 }, 28 db::IncomingType, 29 types::{ 30 amount::{Amount, Currency}, 31 payto::PaytoURI, 32 }, 33 }; 34 use tokio::sync::watch::{Receiver, Sender}; 35 36 use crate::{ 37 db::{BindHelper as _, TypeHelper as _, history, page}, 38 notification_listener, serialized, 39 subject::fmt_out_subject, 40 }; 41 42 pub async fn notification_listener( 43 pool: PgPool, 44 outgoing_channel: Sender<i64>, 45 incoming_channel: Sender<i64>, 46 ) -> sqlx::Result<()> { 47 notification_listener!(&pool, 48 "outgoing_tx" => (row_id: i64) { 49 outgoing_channel.send_replace(row_id); 50 }, 51 "incoming_tx" => (row_id: i64) { 52 incoming_channel.send_replace(row_id); 53 } 54 ) 55 } 56 57 pub enum TransferResult { 58 Success(TransferResponse), 59 RequestUidReuse, 60 WtidReuse, 61 } 62 63 pub async fn transfer(db: &PgPool, req: &TransferRequest) -> sqlx::Result<TransferResult> { 64 let subject = fmt_out_subject(&req.wtid, &req.exchange_base_url, req.metadata.as_deref()); 65 serialized!( 66 sqlx::query( 67 " 68 SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at 69 FROM taler_transfer($1,$2,$3,$4,$5,$6,$7,$8) 70 ", 71 ) 72 .bind(req.amount) 73 .bind(req.exchange_base_url.as_str()) 74 .bind(&req.metadata) 75 .bind(&subject) 76 .bind(req.credit_account.raw()) 77 .bind(&req.request_uid) 78 .bind(&req.wtid) 79 .bind_timestamp(&Timestamp::now()) 80 .try_map(|r: PgRow| { 81 Ok(if r.try_get_flag("out_request_uid_reuse")? { 82 TransferResult::RequestUidReuse 83 } else if r.try_get_flag("out_wtid_reuse")? { 84 TransferResult::WtidReuse 85 } else { 86 TransferResult::Success(TransferResponse { 87 row_id: r.try_get_safeu64("out_transfer_row_id")?, 88 timestamp: r.try_get_timestamp("out_created_at")?.into(), 89 }) 90 }) 91 }) 92 .fetch_one(db) 93 ) 94 } 95 96 pub async fn transfer_page( 97 db: &PgPool, 98 status: &Option<TransferState>, 99 params: &Page, 100 currency: &Currency, 101 ) -> sqlx::Result<Vec<TransferListStatus>> { 102 page( 103 db, 104 params, 105 "transfer_id", 106 || { 107 let mut builder = QueryBuilder::new( 108 " 109 SELECT 110 transfer_id, 111 status, 112 amount, 113 credit_payto, 114 created_at 115 FROM transfer 116 JOIN tx_out USING (tx_out_id) 117 WHERE 118 ", 119 ); 120 if let Some(status) = status { 121 builder.push(" status = ").push_bind(status).push(" AND "); 122 } 123 builder 124 }, 125 |r: PgRow| { 126 Ok(TransferListStatus { 127 row_id: r.try_get_safeu64("transfer_id")?, 128 status: r.try_get("status")?, 129 amount: r.try_get_amount("amount", currency)?, 130 credit_account: r.try_get_payto("credit_payto")?, 131 timestamp: r.try_get_timestamp("created_at")?.into(), 132 }) 133 }, 134 ) 135 .await 136 } 137 138 pub async fn transfer_by_id( 139 db: &PgPool, 140 id: u64, 141 currency: &Currency, 142 ) -> sqlx::Result<Option<TransferStatus>> { 143 serialized!( 144 sqlx::query( 145 " 146 SELECT 147 status, 148 status_msg, 149 amount, 150 exchange_base_url, 151 metadata, 152 wtid, 153 credit_payto, 154 created_at 155 FROM transfer 156 JOIN tx_out USING (tx_out_id) 157 WHERE transfer_id = $1 158 ", 159 ) 160 .bind(id as i64) 161 .try_map(|r: PgRow| { 162 Ok(TransferStatus { 163 status: r.try_get("status")?, 164 status_msg: r.try_get("status_msg")?, 165 amount: r.try_get_amount("amount", currency)?, 166 origin_exchange_url: r.try_get("exchange_base_url")?, 167 metadata: r.try_get("metadata")?, 168 wtid: r.try_get("wtid")?, 169 credit_account: r.try_get_payto("credit_payto")?, 170 timestamp: r.try_get_timestamp("created_at")?.into(), 171 }) 172 }) 173 .fetch_optional(db) 174 ) 175 } 176 177 pub async fn outgoing_revenue( 178 db: &PgPool, 179 params: &History, 180 currency: &Currency, 181 listen: impl FnOnce() -> Receiver<i64>, 182 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 183 history( 184 db, 185 "transfer_id", 186 params, 187 listen, 188 || { 189 QueryBuilder::new( 190 " 191 SELECT 192 transfer_id, 193 amount, 194 exchange_base_url, 195 metadata, 196 wtid, 197 credit_payto, 198 created_at 199 FROM transfer 200 JOIN tx_out USING (tx_out_id) 201 WHERE status = 'success' AND 202 ", 203 ) 204 }, 205 |r| { 206 Ok(OutgoingBankTransaction { 207 amount: r.try_get_amount("amount", currency)?, 208 debit_fee: None, 209 wtid: r.try_get("wtid")?, 210 credit_account: r.try_get_payto("credit_payto")?, 211 row_id: r.try_get_safeu64("transfer_id")?, 212 date: r.try_get_timestamp("created_at")?.into(), 213 exchange_base_url: r.try_get_url("exchange_base_url")?, 214 metadata: r.try_get("metadata")?, 215 }) 216 }, 217 ) 218 .await 219 } 220 221 pub enum AddIncomingResult { 222 Success { id: SafeU64, created_at: Timestamp }, 223 ReservePubReuse, 224 UnknownMapping, 225 MappingReuse, 226 } 227 228 pub async fn add_incoming( 229 db: &PgPool, 230 amount: &Amount, 231 debit_account: &PaytoURI, 232 subject: &str, 233 timestamp: &Timestamp, 234 ty: IncomingType, 235 account_pub: &EddsaPublicKey, 236 ) -> sqlx::Result<AddIncomingResult> { 237 serialized!( 238 sqlx::query( 239 "SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_created_at 240 FROM add_incoming($1,$2,$3,$4,$5,$6)", 241 ) 242 .bind(amount) 243 .bind(subject) 244 .bind(debit_account.raw()) 245 .bind(ty) 246 .bind(account_pub) 247 .bind_timestamp(timestamp) 248 .try_map(|r: PgRow| { 249 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 250 AddIncomingResult::ReservePubReuse 251 } else if r.try_get_flag("out_mapping_reuse")? { 252 AddIncomingResult::MappingReuse 253 } else if r.try_get_flag("out_unknown_mapping")? { 254 AddIncomingResult::UnknownMapping 255 } else{ 256 AddIncomingResult::Success { 257 id: r.try_get_safeu64("out_tx_row_id")?, 258 created_at: r.try_get_timestamp("out_created_at")?, 259 } 260 }) 261 }) 262 .fetch_one(db) 263 ) 264 } 265 266 pub async fn incoming_history( 267 db: &PgPool, 268 params: &History, 269 currency: &Currency, 270 listen: impl FnOnce() -> Receiver<i64>, 271 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 272 history( 273 db, 274 "tx_in_id", 275 params, 276 listen, 277 || { 278 QueryBuilder::new( 279 " 280 SELECT 281 type, 282 tx_in_id, 283 amount, 284 created_at, 285 debit_payto, 286 account_pub, 287 authorization_pub, 288 authorization_sig 289 FROM tx_in 290 JOIN taler_in USING (tx_in_id) 291 WHERE 292 ", 293 ) 294 }, 295 |r: PgRow| { 296 Ok(match r.try_get("type")? { 297 IncomingType::reserve => IncomingBankTransaction::Reserve { 298 row_id: r.try_get_safeu64("tx_in_id")?, 299 date: r.try_get_timestamp("created_at")?.into(), 300 amount: r.try_get_amount("amount", currency)?, 301 credit_fee: None, 302 debit_account: r.try_get_payto("debit_payto")?, 303 reserve_pub: r.try_get("account_pub")?, 304 authorization_pub: r.try_get("authorization_pub")?, 305 authorization_sig: r.try_get("authorization_sig")?, 306 }, 307 IncomingType::kyc => IncomingBankTransaction::Kyc { 308 row_id: r.try_get_safeu64("tx_in_id")?, 309 date: r.try_get_timestamp("created_at")?.into(), 310 amount: r.try_get_amount("amount", currency)?, 311 credit_fee: None, 312 debit_account: r.try_get_payto("debit_payto")?, 313 account_pub: r.try_get("account_pub")?, 314 authorization_pub: r.try_get("authorization_pub")?, 315 authorization_sig: r.try_get("authorization_sig")?, 316 }, 317 IncomingType::map => unimplemented!("MAP are never listed in the history"), 318 }) 319 }, 320 ) 321 .await 322 } 323 324 pub async fn revenue_history( 325 db: &PgPool, 326 params: &History, 327 currency: &Currency, 328 listen: impl FnOnce() -> Receiver<i64>, 329 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 330 history( 331 db, 332 "tx_in_id", 333 params, 334 listen, 335 || { 336 QueryBuilder::new( 337 " 338 SELECT 339 tx_in_id, 340 amount, 341 created_at, 342 debit_payto, 343 subject 344 FROM tx_in 345 WHERE 346 ", 347 ) 348 }, 349 |r: PgRow| { 350 Ok(RevenueIncomingBankTransaction { 351 row_id: r.try_get_safeu64("tx_in_id")?, 352 date: r.try_get_timestamp("created_at")?.into(), 353 amount: r.try_get_amount("amount", currency)?, 354 credit_fee: None, 355 debit_account: r.try_get_payto("debit_payto")?, 356 subject: r.try_get("subject")?, 357 }) 358 }, 359 ) 360 .await 361 } 362 363 pub enum RegistrationResult { 364 Success, 365 ReservePubReuse, 366 } 367 368 pub async fn transfer_register( 369 db: &PgPool, 370 req: &RegistrationRequest, 371 ) -> sqlx::Result<RegistrationResult> { 372 let ty: IncomingType = req.r#type.into(); 373 serialized!( 374 sqlx::query( 375 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 376 ) 377 .bind(ty) 378 .bind(&req.account_pub) 379 .bind(&req.authorization_pub) 380 .bind(&req.authorization_sig) 381 .bind(req.recurrent) 382 .bind_timestamp(&Timestamp::now()) 383 .try_map(|r: PgRow| { 384 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 385 RegistrationResult::ReservePubReuse 386 } else { 387 RegistrationResult::Success 388 }) 389 }) 390 .fetch_one(db) 391 ) 392 } 393 394 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 395 serialized!( 396 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") 397 .bind(&req.authorization_pub) 398 .bind_timestamp(&Timestamp::now()) 399 .try_map(|r: PgRow| r.try_get_flag("out_found")) 400 .fetch_one(db) 401 ) 402 }