db.rs (12631B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2024, 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow}; 19 use taler_api::{ 20 db::{BindHelper, TypeHelper, history, page}, 21 serialized, 22 subject::fmt_out_subject, 23 }; 24 use taler_common::{ 25 api_common::{EddsaPublicKey, SafeU64}, 26 api_params::{History, Page}, 27 api_revenue::RevenueIncomingBankTransaction, 28 api_transfer::{RegistrationRequest, Unregistration}, 29 api_wire::{ 30 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 31 TransferResponse, TransferState, TransferStatus, 32 }, 33 db::IncomingType, 34 types::{ 35 amount::{Amount, Currency}, 36 payto::PaytoURI, 37 }, 38 }; 39 use tokio::sync::watch::{Receiver, Sender}; 40 41 pub async fn notification_listener( 42 pool: PgPool, 43 outgoing_channel: Sender<i64>, 44 incoming_channel: Sender<i64>, 45 ) -> sqlx::Result<()> { 46 taler_api::notification::notification_listener!(&pool, 47 "outgoing_tx" => (row_id: i64) { 48 outgoing_channel.send_replace(row_id); 49 }, 50 "incoming_tx" => (row_id: i64) { 51 incoming_channel.send_replace(row_id); 52 } 53 ) 54 } 55 56 pub enum TransferResult { 57 Success(TransferResponse), 58 RequestUidReuse, 59 WtidReuse, 60 } 61 62 pub async fn transfer(db: &PgPool, req: &TransferRequest) -> sqlx::Result<TransferResult> { 63 let subject = fmt_out_subject(&req.wtid, &req.exchange_base_url, req.metadata.as_deref()); 64 serialized!( 65 sqlx::query( 66 " 67 SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at 68 FROM taler_transfer($1,$2,$3,$4,$5,$6,$7,$8) 69 ", 70 ) 71 .bind(&req.amount) 72 .bind(req.exchange_base_url.as_str()) 73 .bind(&req.metadata) 74 .bind(&subject) 75 .bind(req.credit_account.raw()) 76 .bind(&req.request_uid) 77 .bind(&req.wtid) 78 .bind_timestamp(&Timestamp::now()) 79 .try_map(|r: PgRow| { 80 Ok(if r.try_get_flag("out_request_uid_reuse")? { 81 TransferResult::RequestUidReuse 82 } else if r.try_get_flag("out_wtid_reuse")? { 83 TransferResult::WtidReuse 84 } else { 85 TransferResult::Success(TransferResponse { 86 row_id: r.try_get_safeu64("out_transfer_row_id")?, 87 timestamp: r.try_get_timestamp("out_created_at")?.into(), 88 }) 89 }) 90 }) 91 .fetch_one(db) 92 ) 93 } 94 95 pub async fn transfer_page( 96 db: &PgPool, 97 status: &Option<TransferState>, 98 params: &Page, 99 currency: &Currency, 100 ) -> sqlx::Result<Vec<TransferListStatus>> { 101 page( 102 db, 103 "transfer_id", 104 params, 105 || { 106 let mut builder = QueryBuilder::new( 107 " 108 SELECT 109 transfer_id, 110 status, 111 amount, 112 credit_payto, 113 created_at 114 FROM transfer 115 JOIN tx_out USING (tx_out_id) 116 WHERE 117 ", 118 ); 119 if let Some(status) = status { 120 builder.push(" status = ").push_bind(status).push(" AND "); 121 } 122 builder 123 }, 124 |r: PgRow| { 125 Ok(TransferListStatus { 126 row_id: r.try_get_safeu64("transfer_id")?, 127 status: r.try_get("status")?, 128 amount: r.try_get_amount("amount", currency)?, 129 credit_account: r.try_get_payto("credit_payto")?, 130 timestamp: r.try_get_timestamp("created_at")?.into(), 131 }) 132 }, 133 ) 134 .await 135 } 136 137 pub async fn transfer_by_id( 138 db: &PgPool, 139 id: u64, 140 currency: &Currency, 141 ) -> sqlx::Result<Option<TransferStatus>> { 142 serialized!( 143 sqlx::query( 144 " 145 SELECT 146 status, 147 status_msg, 148 amount, 149 exchange_base_url, 150 metadata, 151 wtid, 152 credit_payto, 153 created_at 154 FROM transfer 155 JOIN tx_out USING (tx_out_id) 156 WHERE transfer_id = $1 157 ", 158 ) 159 .bind(id as i64) 160 .try_map(|r: PgRow| { 161 Ok(TransferStatus { 162 status: r.try_get("status")?, 163 status_msg: r.try_get("status_msg")?, 164 amount: r.try_get_amount("amount", currency)?, 165 origin_exchange_url: r.try_get("exchange_base_url")?, 166 metadata: r.try_get("metadata")?, 167 wtid: r.try_get("wtid")?, 168 credit_account: r.try_get_payto("credit_payto")?, 169 timestamp: r.try_get_timestamp("created_at")?.into(), 170 }) 171 }) 172 .fetch_optional(db) 173 ) 174 } 175 176 pub async fn outgoing_revenue( 177 db: &PgPool, 178 params: &History, 179 currency: &Currency, 180 listen: impl FnOnce() -> Receiver<i64>, 181 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 182 history( 183 db, 184 "transfer_id", 185 params, 186 listen, 187 || { 188 QueryBuilder::new( 189 " 190 SELECT 191 transfer_id, 192 amount, 193 exchange_base_url, 194 metadata, 195 wtid, 196 credit_payto, 197 created_at 198 FROM transfer 199 JOIN tx_out USING (tx_out_id) 200 WHERE status = 'success' AND 201 ", 202 ) 203 }, 204 |r| { 205 Ok(OutgoingBankTransaction { 206 amount: r.try_get_amount("amount", currency)?, 207 debit_fee: None, 208 wtid: r.try_get("wtid")?, 209 credit_account: r.try_get_payto("credit_payto")?, 210 row_id: r.try_get_safeu64("transfer_id")?, 211 date: r.try_get_timestamp("created_at")?.into(), 212 exchange_base_url: r.try_get_url("exchange_base_url")?, 213 metadata: r.try_get("metadata")?, 214 }) 215 }, 216 ) 217 .await 218 } 219 220 pub enum AddIncomingResult { 221 Success { id: SafeU64, created_at: Timestamp }, 222 ReservePubReuse, 223 UnknownMapping, 224 MappingReuse, 225 } 226 227 pub async fn add_incoming( 228 db: &PgPool, 229 amount: &Amount, 230 debit_account: &PaytoURI, 231 subject: &str, 232 timestamp: &Timestamp, 233 ty: IncomingType, 234 account_pub: &EddsaPublicKey, 235 ) -> sqlx::Result<AddIncomingResult> { 236 serialized!( 237 sqlx::query( 238 "SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_created_at 239 FROM add_incoming($1,$2,$3,$4,$5,$6)", 240 ) 241 .bind(amount) 242 .bind(subject) 243 .bind(debit_account.raw()) 244 .bind(ty) 245 .bind(account_pub) 246 .bind_timestamp(timestamp) 247 .try_map(|r: PgRow| { 248 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 249 AddIncomingResult::ReservePubReuse 250 } else if r.try_get_flag("out_mapping_reuse")? { 251 AddIncomingResult::MappingReuse 252 } else if r.try_get_flag("out_unknown_mapping")? { 253 AddIncomingResult::UnknownMapping 254 } else{ 255 AddIncomingResult::Success { 256 id: r.try_get_safeu64("out_tx_row_id")?, 257 created_at: r.try_get_timestamp("out_created_at")?, 258 } 259 }) 260 }) 261 .fetch_one(db) 262 ) 263 } 264 265 pub async fn incoming_history( 266 db: &PgPool, 267 params: &History, 268 currency: &Currency, 269 listen: impl FnOnce() -> Receiver<i64>, 270 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 271 history( 272 db, 273 "tx_in_id", 274 params, 275 listen, 276 || { 277 QueryBuilder::new( 278 " 279 SELECT 280 type, 281 tx_in_id, 282 amount, 283 created_at, 284 debit_payto, 285 account_pub, 286 authorization_pub, 287 authorization_sig 288 FROM tx_in 289 JOIN taler_in USING (tx_in_id) 290 WHERE 291 ", 292 ) 293 }, 294 |r: PgRow| { 295 Ok(match r.try_get("type")? { 296 IncomingType::reserve => IncomingBankTransaction::Reserve { 297 row_id: r.try_get_safeu64("tx_in_id")?, 298 date: r.try_get_timestamp("created_at")?.into(), 299 amount: r.try_get_amount("amount", currency)?, 300 credit_fee: None, 301 debit_account: r.try_get_payto("debit_payto")?, 302 reserve_pub: r.try_get("account_pub")?, 303 authorization_pub: r.try_get("authorization_pub")?, 304 authorization_sig: r.try_get("authorization_sig")?, 305 }, 306 IncomingType::kyc => IncomingBankTransaction::Kyc { 307 row_id: r.try_get_safeu64("tx_in_id")?, 308 date: r.try_get_timestamp("created_at")?.into(), 309 amount: r.try_get_amount("amount", currency)?, 310 credit_fee: None, 311 debit_account: r.try_get_payto("debit_payto")?, 312 account_pub: r.try_get("account_pub")?, 313 authorization_pub: r.try_get("authorization_pub")?, 314 authorization_sig: r.try_get("authorization_sig")?, 315 }, 316 IncomingType::map => unimplemented!("MAP are never listed in the history"), 317 }) 318 }, 319 ) 320 .await 321 } 322 323 pub async fn revenue_history( 324 db: &PgPool, 325 params: &History, 326 currency: &Currency, 327 listen: impl FnOnce() -> Receiver<i64>, 328 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 329 history( 330 db, 331 "tx_in_id", 332 params, 333 listen, 334 || { 335 QueryBuilder::new( 336 " 337 SELECT 338 tx_in_id, 339 amount, 340 created_at, 341 debit_payto, 342 subject 343 FROM tx_in 344 WHERE 345 ", 346 ) 347 }, 348 |r: PgRow| { 349 Ok(RevenueIncomingBankTransaction { 350 row_id: r.try_get_safeu64("tx_in_id")?, 351 date: r.try_get_timestamp("created_at")?.into(), 352 amount: r.try_get_amount("amount", currency)?, 353 credit_fee: None, 354 debit_account: r.try_get_payto("debit_payto")?, 355 subject: r.try_get("subject")?, 356 }) 357 }, 358 ) 359 .await 360 } 361 362 pub enum RegistrationResult { 363 Success, 364 ReservePubReuse, 365 } 366 367 pub async fn transfer_register( 368 db: &PgPool, 369 req: &RegistrationRequest, 370 ) -> sqlx::Result<RegistrationResult> { 371 let ty: IncomingType = req.r#type.into(); 372 serialized!( 373 sqlx::query( 374 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 375 ) 376 .bind(ty) 377 .bind(&req.account_pub) 378 .bind(&req.authorization_pub) 379 .bind(&req.authorization_sig) 380 .bind(req.recurrent) 381 .bind_timestamp(&Timestamp::now()) 382 .try_map(|r: PgRow| { 383 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 384 RegistrationResult::ReservePubReuse 385 } else { 386 RegistrationResult::Success 387 }) 388 }) 389 .fetch_one(db) 390 ) 391 } 392 393 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 394 serialized!( 395 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") 396 .bind(&req.authorization_pub) 397 .bind_timestamp(&Timestamp::now()) 398 .try_map(|r: PgRow| r.try_get_flag("out_found")) 399 .fetch_one(db) 400 ) 401 }