db.rs (50241B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::{Timestamp, civil::Date, tz::TimeZone}; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api::{ 30 HashCode, ShortHashCode, 31 params::{History, Page}, 32 prepared::{RegistrationRequest, Unregistration}, 33 revenue::RevenueIncomingBankTransaction, 34 wire::{ 35 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 36 TransferStatus, 37 }, 38 }, 39 config::Config, 40 db::IncomingType, 41 types::{ 42 amount::{Amount, Decimal}, 43 payto::PaytoImpl as _, 44 }, 45 }; 46 use tokio::sync::watch::{Receiver, Sender}; 47 use url::Url; 48 49 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURR, magnet_api::types::TxStatus}; 50 51 const SCHEMA: &str = "magnet_bank"; 52 53 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 54 let db = parse_db_cfg(cfg)?; 55 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 56 Ok(pool) 57 } 58 59 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 60 let db_cfg = parse_db_cfg(cfg)?; 61 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 62 let mut db = pool.acquire().await?; 63 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?; 64 Ok(pool) 65 } 66 67 pub async fn notification_listener( 68 pool: PgPool, 69 in_channel: Sender<i64>, 70 taler_in_channel: Sender<i64>, 71 out_channel: Sender<i64>, 72 taler_out_channel: Sender<i64>, 73 ) { 74 taler_api::notification::notification_listener!(&pool, 75 "tx_in" => (row_id: i64) { 76 in_channel.send_replace(row_id); 77 }, 78 "taler_in" => (row_id: i64) { 79 taler_in_channel.send_replace(row_id); 80 }, 81 "tx_out" => (row_id: i64) { 82 out_channel.send_replace(row_id); 83 }, 84 "taler_out" => (row_id: i64) { 85 taler_out_channel.send_replace(row_id); 86 } 87 ) 88 } 89 90 #[derive(Debug, Clone)] 91 pub struct TxIn { 92 pub code: u64, 93 pub amount: Amount, 94 pub subject: Box<str>, 95 pub debtor: FullHuPayto, 96 pub value_date: Date, 97 pub status: TxStatus, 98 } 99 100 impl Display for TxIn { 101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 102 let Self { 103 code, 104 amount, 105 subject, 106 debtor, 107 value_date, 108 status, 109 } = self; 110 write!( 111 f, 112 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 113 debtor.bban(), 114 debtor.name 115 ) 116 } 117 } 118 119 #[derive(Debug, Clone)] 120 pub struct TxOut { 121 pub code: u64, 122 pub amount: Amount, 123 pub subject: Box<str>, 124 pub creditor: FullHuPayto, 125 pub value_date: Date, 126 pub status: TxStatus, 127 } 128 129 impl Display for TxOut { 130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 131 let Self { 132 code, 133 amount, 134 subject, 135 creditor, 136 value_date, 137 status, 138 } = self; 139 write!( 140 f, 141 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 142 creditor.bban(), 143 &creditor.name 144 ) 145 } 146 } 147 148 #[derive(Debug, PartialEq, Eq)] 149 pub struct Initiated { 150 pub id: u64, 151 pub amount: Amount, 152 pub subject: Box<str>, 153 pub creditor: FullHuPayto, 154 } 155 156 impl Display for Initiated { 157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 158 let Self { 159 id, 160 amount, 161 subject, 162 creditor, 163 } = self; 164 write!( 165 f, 166 "{id} {amount} ({} {}) '{subject}'", 167 creditor.bban(), 168 &creditor.name 169 ) 170 } 171 } 172 173 #[derive(Debug, Clone)] 174 pub struct TxInAdmin { 175 pub amount: Amount, 176 pub subject: String, 177 pub debtor: FullHuPayto, 178 pub metadata: IncomingSubject, 179 } 180 181 /// Lock the database for worker execution 182 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 183 sqlx::query("SELECT pg_try_advisory_lock(42)") 184 .try_map(|r: PgRow| r.try_get(0)) 185 .fetch_one(e) 186 .await 187 } 188 189 #[derive(Debug, PartialEq, Eq)] 190 pub enum AddIncomingResult { 191 Success { 192 new: bool, 193 pending: bool, 194 row_id: u64, 195 valued_at: Date, 196 }, 197 ReservePubReuse, 198 UnknownMapping, 199 MappingReuse, 200 } 201 202 pub async fn register_tx_in_admin( 203 db: &PgPool, 204 tx: &TxInAdmin, 205 now: &Timestamp, 206 ) -> sqlx::Result<AddIncomingResult> { 207 serialized!( 208 sqlx::query( 209 " 210 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 211 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) 212 ", 213 ) 214 .bind(tx.amount) 215 .bind(&tx.subject) 216 .bind(tx.debtor.iban()) 217 .bind(&tx.debtor.name) 218 .bind_date(&now.to_zoned(TimeZone::UTC).date()) 219 .bind(tx.metadata.ty()) 220 .bind(tx.metadata.key()) 221 .try_map(|r: PgRow| { 222 Ok(if r.try_get_flag(0)? { 223 AddIncomingResult::ReservePubReuse 224 } else if r.try_get_flag(1)? { 225 AddIncomingResult::MappingReuse 226 } else if r.try_get_flag(2)? { 227 AddIncomingResult::UnknownMapping 228 } else { 229 AddIncomingResult::Success { 230 row_id: r.try_get_u64(3)?, 231 valued_at: r.try_get_date(4)?, 232 new: r.try_get(5)?, 233 pending: r.try_get(6)? 234 } 235 }) 236 }) 237 .fetch_one(db) 238 ) 239 } 240 241 pub async fn register_tx_in( 242 db: &mut PgConnection, 243 tx: &TxIn, 244 subject: &Option<IncomingSubject>, 245 now: &Timestamp, 246 ) -> sqlx::Result<AddIncomingResult> { 247 serialized!( 248 sqlx::query( 249 " 250 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 251 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) 252 ", 253 ) 254 .bind(tx.code as i64) 255 .bind(tx.amount) 256 .bind(&tx.subject) 257 .bind(tx.debtor.iban()) 258 .bind(&tx.debtor.name) 259 .bind_date(&tx.value_date) 260 .bind(subject.as_ref().map(|it| it.ty())) 261 .bind(subject.as_ref().map(|it| it.key())) 262 .bind_timestamp(now) 263 .try_map(|r: PgRow| { 264 Ok(if r.try_get_flag(0)? { 265 AddIncomingResult::ReservePubReuse 266 } else if r.try_get_flag(1)? { 267 AddIncomingResult::MappingReuse 268 } else if r.try_get_flag(2)? { 269 AddIncomingResult::UnknownMapping 270 } else { 271 AddIncomingResult::Success { 272 row_id: r.try_get_u64(3)?, 273 valued_at: r.try_get_date(4)?, 274 new: r.try_get(5)?, 275 pending: r.try_get(6)? 276 } 277 }) 278 }) 279 .fetch_one(&mut *db) 280 ) 281 } 282 283 #[derive(Debug)] 284 pub enum TxOutKind { 285 Simple, 286 Bounce(u32), 287 Talerable(OutgoingSubject), 288 } 289 290 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 291 #[allow(non_camel_case_types)] 292 #[sqlx(type_name = "register_result")] 293 pub enum RegisterResult { 294 /// Already registered 295 idempotent, 296 /// Initiated transaction 297 known, 298 /// Recovered unknown outgoing transaction 299 recovered, 300 } 301 302 #[derive(Debug, PartialEq, Eq)] 303 pub struct AddOutgoingResult { 304 pub result: RegisterResult, 305 pub row_id: u64, 306 } 307 308 pub async fn register_tx_out( 309 db: &mut PgConnection, 310 tx: &TxOut, 311 kind: &TxOutKind, 312 now: &Timestamp, 313 ) -> sqlx::Result<AddOutgoingResult> { 314 serialized!({ 315 let query = sqlx::query( 316 " 317 SELECT out_result, out_tx_row_id 318 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 319 ", 320 ) 321 .bind(tx.code as i64) 322 .bind(tx.amount) 323 .bind(&tx.subject) 324 .bind(tx.creditor.iban()) 325 .bind(&tx.creditor.name) 326 .bind_date(&tx.value_date); 327 let query = match kind { 328 TxOutKind::Simple => query 329 .bind(None::<&[u8]>) 330 .bind(None::<&str>) 331 .bind(None::<&str>) 332 .bind(None::<i64>), 333 TxOutKind::Bounce(bounced) => query 334 .bind(None::<&[u8]>) 335 .bind(None::<&str>) 336 .bind(None::<&str>) 337 .bind(*bounced as i64), 338 TxOutKind::Talerable(subject) => query 339 .bind(&subject.wtid) 340 .bind(subject.exchange_base_url.as_str()) 341 .bind(&subject.metadata) 342 .bind(None::<i64>), 343 }; 344 query 345 .bind_timestamp(now) 346 .try_map(|r: PgRow| { 347 Ok(AddOutgoingResult { 348 result: r.try_get(0)?, 349 row_id: r.try_get_u64(1)?, 350 }) 351 }) 352 .fetch_one(&mut *db) 353 }) 354 } 355 356 #[derive(Debug, PartialEq, Eq)] 357 pub struct OutFailureResult { 358 pub initiated_id: Option<u64>, 359 pub new: bool, 360 } 361 362 pub async fn register_tx_out_failure( 363 db: &mut PgConnection, 364 code: u64, 365 bounced: Option<u32>, 366 now: &Timestamp, 367 ) -> sqlx::Result<OutFailureResult> { 368 serialized!( 369 sqlx::query( 370 " 371 SELECT out_new, out_initiated_id 372 FROM register_tx_out_failure($1, $2, $3) 373 ", 374 ) 375 .bind(code as i64) 376 .bind(bounced.map(|i| i as i32)) 377 .bind_timestamp(now) 378 .try_map(|r: PgRow| { 379 Ok(OutFailureResult { 380 new: r.try_get(0)?, 381 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), 382 }) 383 }) 384 .fetch_one(&mut *db) 385 ) 386 } 387 388 #[derive(Debug, PartialEq, Eq)] 389 pub enum TransferResult { 390 Success { id: u64, initiated_at: Timestamp }, 391 RequestUidReuse, 392 WtidReuse, 393 } 394 395 #[derive(Debug, Clone)] 396 pub struct Transfer { 397 pub request_uid: HashCode, 398 pub amount: Decimal, 399 pub exchange_base_url: Url, 400 pub metadata: Option<CompactString>, 401 pub wtid: ShortHashCode, 402 pub creditor: FullHuPayto, 403 } 404 405 pub async fn make_transfer( 406 db: &PgPool, 407 tx: &Transfer, 408 now: &Timestamp, 409 ) -> sqlx::Result<TransferResult> { 410 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 411 serialized!( 412 sqlx::query( 413 " 414 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 415 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 416 ", 417 ) 418 .bind(&tx.request_uid) 419 .bind(&tx.wtid) 420 .bind(&subject) 421 .bind(tx.amount) 422 .bind(tx.exchange_base_url.as_str()) 423 .bind(&tx.metadata) 424 .bind(tx.creditor.iban()) 425 .bind(&tx.creditor.name) 426 .bind_timestamp(now) 427 .try_map(|r: PgRow| { 428 Ok(if r.try_get_flag(0)? { 429 TransferResult::RequestUidReuse 430 } else if r.try_get_flag(1)? { 431 TransferResult::WtidReuse 432 } else { 433 TransferResult::Success { 434 id: r.try_get_u64(2)?, 435 initiated_at: r.try_get_timestamp(3)?, 436 } 437 }) 438 }) 439 .fetch_one(db) 440 ) 441 } 442 443 #[derive(Debug, PartialEq, Eq)] 444 pub struct BounceResult { 445 pub tx_id: u64, 446 pub tx_new: bool, 447 pub bounce_id: u64, 448 pub bounce_new: bool, 449 } 450 451 pub async fn register_bounce_tx_in( 452 db: &mut PgConnection, 453 tx: &TxIn, 454 reason: &str, 455 now: &Timestamp, 456 ) -> sqlx::Result<BounceResult> { 457 serialized!( 458 sqlx::query( 459 " 460 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new 461 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8) 462 ", 463 ) 464 .bind(tx.code as i64) 465 .bind(tx.amount) 466 .bind(&tx.subject) 467 .bind(tx.debtor.iban()) 468 .bind(&tx.debtor.name) 469 .bind_date(&tx.value_date) 470 .bind(reason) 471 .bind_timestamp(now) 472 .try_map(|r: PgRow| { 473 Ok(BounceResult { 474 tx_id: r.try_get_u64(0)?, 475 tx_new: r.try_get(1)?, 476 bounce_id: r.try_get_u64(2)?, 477 bounce_new: r.try_get(3)?, 478 }) 479 }) 480 .fetch_one(&mut *db) 481 ) 482 } 483 484 pub async fn transfer_page( 485 db: &PgPool, 486 status: &Option<TransferState>, 487 params: &Page, 488 ) -> sqlx::Result<Vec<TransferListStatus>> { 489 page( 490 db, 491 params, 492 "initiated_id", 493 || { 494 let mut builder = QueryBuilder::new( 495 " 496 SELECT 497 initiated_id, 498 status, 499 amount, 500 credit_account, 501 credit_name, 502 initiated_at 503 FROM transfer 504 JOIN initiated USING (initiated_id) 505 WHERE 506 ", 507 ); 508 if let Some(status) = status { 509 builder.push(" status = ").push_bind(status).push(" AND "); 510 } 511 builder 512 }, 513 |r: PgRow| { 514 Ok(TransferListStatus { 515 row_id: r.try_get_u64(0)?, 516 status: r.try_get(1)?, 517 amount: r.try_get_amount(2, &CURR)?, 518 credit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 519 timestamp: r.try_get_timestamp(5)?.into(), 520 }) 521 }, 522 ) 523 .await 524 } 525 526 pub async fn outgoing_history( 527 db: &PgPool, 528 params: &History, 529 listen: impl FnOnce() -> Receiver<i64>, 530 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 531 history( 532 db, 533 "tx_out_id", 534 params, 535 listen, 536 || { 537 QueryBuilder::new( 538 " 539 SELECT 540 tx_out_id, 541 amount, 542 credit_account, 543 credit_name, 544 valued_at, 545 exchange_base_url, 546 metadata, 547 wtid 548 FROM taler_out 549 JOIN tx_out USING (tx_out_id) 550 WHERE 551 ", 552 ) 553 }, 554 |r: PgRow| { 555 Ok(OutgoingBankTransaction { 556 row_id: r.try_get_u64(0)?, 557 amount: r.try_get_amount(1, &CURR)?, 558 debit_fee: None, 559 credit_account: r.try_get_iban(2)?.as_full_uri(r.try_get(3)?), 560 date: r.try_get_timestamp(4)?.into(), 561 exchange_base_url: r.try_get_url(5)?, 562 metadata: r.try_get(6)?, 563 wtid: r.try_get(7)?, 564 }) 565 }, 566 ) 567 .await 568 } 569 570 pub async fn incoming_history( 571 db: &PgPool, 572 params: &History, 573 listen: impl FnOnce() -> Receiver<i64>, 574 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 575 history( 576 db, 577 "tx_in_id", 578 params, 579 listen, 580 || { 581 QueryBuilder::new( 582 " 583 SELECT 584 type, 585 tx_in_id, 586 amount, 587 debit_account, 588 debit_name, 589 valued_at, 590 metadata, 591 authorization_pub, 592 authorization_sig 593 FROM taler_in 594 JOIN tx_in USING (tx_in_id) 595 WHERE 596 ", 597 ) 598 }, 599 |r: PgRow| { 600 Ok(match r.try_get(0)? { 601 IncomingType::reserve => IncomingBankTransaction::Reserve { 602 row_id: r.try_get_u64(1)?, 603 amount: r.try_get_amount(2, &CURR)?, 604 credit_fee: None, 605 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 606 date: r.try_get_timestamp(5)?.into(), 607 reserve_pub: r.try_get(6)?, 608 authorization_pub: r.try_get(7)?, 609 authorization_sig: r.try_get(8)?, 610 }, 611 IncomingType::kyc => IncomingBankTransaction::Kyc { 612 row_id: r.try_get_u64(1)?, 613 amount: r.try_get_amount(2, &CURR)?, 614 credit_fee: None, 615 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 616 date: r.try_get_timestamp(5)?.into(), 617 account_pub: r.try_get(6)?, 618 authorization_pub: r.try_get(7)?, 619 authorization_sig: r.try_get(8)?, 620 }, 621 IncomingType::map => unimplemented!("MAP are never listed in the history"), 622 }) 623 }, 624 ) 625 .await 626 } 627 628 pub async fn revenue_history( 629 db: &PgPool, 630 params: &History, 631 listen: impl FnOnce() -> Receiver<i64>, 632 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 633 history( 634 db, 635 "tx_in_id", 636 params, 637 listen, 638 || { 639 QueryBuilder::new( 640 " 641 SELECT 642 tx_in_id, 643 valued_at, 644 amount, 645 debit_account, 646 debit_name, 647 subject 648 FROM tx_in 649 WHERE 650 ", 651 ) 652 }, 653 |r: PgRow| { 654 Ok(RevenueIncomingBankTransaction { 655 row_id: r.try_get_u64(0)?, 656 date: r.try_get_timestamp(1)?.into(), 657 amount: r.try_get_amount(2, &CURR)?, 658 credit_fee: None, 659 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 660 subject: r.try_get(5)?, 661 }) 662 }, 663 ) 664 .await 665 } 666 667 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> { 668 serialized!( 669 sqlx::query( 670 " 671 SELECT 672 status, 673 status_msg, 674 amount, 675 exchange_base_url, 676 metadata, 677 wtid, 678 credit_account, 679 credit_name, 680 initiated_at 681 FROM transfer 682 JOIN initiated USING (initiated_id) 683 WHERE initiated_id = $1 684 ", 685 ) 686 .bind(id as i64) 687 .try_map(|r: PgRow| { 688 Ok(TransferStatus { 689 status: r.try_get(0)?, 690 status_msg: r.try_get(1)?, 691 amount: r.try_get_amount(2, &CURR)?, 692 origin_exchange_url: r.try_get(3)?, 693 metadata: r.try_get(4)?, 694 wtid: r.try_get(5)?, 695 credit_account: r.try_get_iban(6)?.as_full_uri(r.try_get(7)?), 696 timestamp: r.try_get_timestamp(8)?.into(), 697 }) 698 }) 699 .fetch_optional(db) 700 ) 701 } 702 703 /** Get a batch of pending initiated transactions not attempted since [start] */ 704 pub async fn pending_batch( 705 db: &mut PgConnection, 706 start: &Timestamp, 707 ) -> sqlx::Result<Vec<Initiated>> { 708 serialized!( 709 sqlx::query( 710 " 711 SELECT initiated_id, amount, subject, credit_account, credit_name 712 FROM initiated 713 WHERE magnet_code IS NULL 714 AND status='pending' 715 AND (last_submitted IS NULL OR last_submitted < $1) 716 LIMIT 100 717 ", 718 ) 719 .bind_timestamp(start) 720 .try_map(|r: PgRow| { 721 Ok(Initiated { 722 id: r.try_get_u64(0)?, 723 amount: r.try_get_amount(1, &CURR)?, 724 subject: r.try_get(2)?, 725 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 726 }) 727 }) 728 .fetch_all(&mut *db) 729 ) 730 } 731 732 /** Get an initiated transaction matching the given magnet [code] */ 733 pub async fn initiated_by_code( 734 db: &mut PgConnection, 735 code: u64, 736 ) -> sqlx::Result<Option<Initiated>> { 737 serialized!( 738 sqlx::query( 739 " 740 SELECT initiated_id, amount, subject, credit_account, credit_name 741 FROM initiated 742 WHERE magnet_code IS $1 743 ", 744 ) 745 .bind(code as i64) 746 .try_map(|r: PgRow| { 747 Ok(Initiated { 748 id: r.try_get_u64(0)?, 749 amount: r.try_get_amount(1, &CURR)?, 750 subject: r.try_get(2)?, 751 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 752 }) 753 }) 754 .fetch_optional(&mut *db) 755 ) 756 } 757 758 /** Update status of a successful submitted initiated transaction */ 759 pub async fn initiated_submit_success( 760 db: &mut PgConnection, 761 id: u64, 762 timestamp: &Timestamp, 763 magnet_code: u64, 764 ) -> sqlx::Result<()> { 765 serialized!( 766 sqlx::query( 767 " 768 UPDATE initiated 769 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 770 WHERE initiated_id=$3 771 " 772 ).bind_timestamp(timestamp) 773 .bind(magnet_code as i64) 774 .bind(id as i64) 775 .execute(&mut *db) 776 )?; 777 Ok(()) 778 } 779 780 /** Update status of a permanently failed initiated transaction */ 781 pub async fn initiated_submit_permanent_failure( 782 db: &mut PgConnection, 783 id: u64, 784 timestamp: &Timestamp, 785 msg: &str, 786 ) -> sqlx::Result<()> { 787 serialized!( 788 sqlx::query( 789 " 790 UPDATE initiated 791 SET status='permanent_failure', status_msg=$2 792 WHERE initiated_id=$3 793 ", 794 ) 795 .bind_timestamp(timestamp) 796 .bind(msg) 797 .bind(id as i64) 798 .execute(&mut *db) 799 )?; 800 Ok(()) 801 } 802 803 /** Check if an initiated transaction exist for a magnet code */ 804 pub async fn initiated_exists_for_code( 805 db: &mut PgConnection, 806 code: u64, 807 ) -> sqlx::Result<Option<u64>> { 808 serialized!( 809 sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") 810 .bind(code as i64) 811 .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) 812 .fetch_optional(&mut *db) 813 ) 814 } 815 816 /** Get JSON value from KV table */ 817 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 818 db: &mut PgConnection, 819 key: &str, 820 ) -> sqlx::Result<Option<T>> { 821 serialized!( 822 sqlx::query("SELECT value FROM kv WHERE key=$1") 823 .bind(key) 824 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 825 .fetch_optional(&mut *db) 826 ) 827 } 828 829 /** Set JSON value in KV table */ 830 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 831 serialized!( 832 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 833 .bind(key) 834 .bind(sqlx::types::Json(value)) 835 .execute(&mut *db) 836 )?; 837 Ok(()) 838 } 839 840 pub enum RegistrationResult { 841 Success, 842 ReservePubReuse, 843 } 844 845 pub async fn transfer_register( 846 db: &PgPool, 847 req: &RegistrationRequest, 848 ) -> sqlx::Result<RegistrationResult> { 849 let ty: IncomingType = req.r#type.into(); 850 serialized!( 851 sqlx::query( 852 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 853 ) 854 .bind(ty) 855 .bind(&req.account_pub) 856 .bind(&req.authorization_pub) 857 .bind(&req.authorization_sig) 858 .bind(req.recurrent) 859 .bind_timestamp(&Timestamp::now()) 860 .try_map(|r: PgRow| { 861 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 862 RegistrationResult::ReservePubReuse 863 } else { 864 RegistrationResult::Success 865 }) 866 }) 867 .fetch_one(db) 868 ) 869 } 870 871 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 872 serialized!( 873 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") 874 .bind(&req.authorization_pub) 875 .bind_timestamp(&Timestamp::now()) 876 .try_map(|r: PgRow| r.try_get_flag("out_found")) 877 .fetch_one(db) 878 ) 879 } 880 881 #[cfg(test)] 882 mod test { 883 use std::assert_matches; 884 885 use jiff::{Span, Timestamp, Zoned}; 886 use serde_json::json; 887 use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; 888 use taler_api::{ 889 db::TypeHelper, 890 notification::dummy_listen, 891 subject::{IncomingSubject, OutgoingSubject}, 892 }; 893 use taler_common::{ 894 api::{ 895 EddsaPublicKey, HashCode, ShortHashCode, 896 params::{History, Page}, 897 }, 898 types::{ 899 amount::{amount, decimal}, 900 url, 901 utils::now_sql_stable_ts, 902 }, 903 }; 904 905 use super::TxInAdmin; 906 use crate::{ 907 constants::CONFIG_SOURCE, 908 db::{ 909 self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult, 910 TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, 911 register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out, 912 }, 913 magnet_api::types::TxStatus, 914 magnet_payto, 915 }; 916 917 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 918 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 919 } 920 921 #[tokio::test] 922 async fn kv() { 923 let (mut db, _) = setup().await; 924 925 let value = json!({ 926 "name": "Mr Smith", 927 "no way": 32 928 }); 929 930 assert_eq!( 931 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 932 None 933 ); 934 kv_set(&mut db, "value", &value).await.unwrap(); 935 kv_set(&mut db, "value", &value).await.unwrap(); 936 assert_eq!( 937 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 938 Some(value) 939 ); 940 } 941 942 #[tokio::test] 943 async fn tx_in() { 944 let (mut db, pool) = setup().await; 945 946 let mut routine = async |first: &Option<IncomingSubject>, 947 second: &Option<IncomingSubject>| { 948 let (id, code) = 949 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") 950 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 951 .fetch_one(&mut *db) 952 .await 953 .unwrap(); 954 let now = now_sql_stable_ts(); 955 let date = Zoned::now().date(); 956 let later = date.tomorrow().unwrap(); 957 let tx = TxIn { 958 code, 959 amount: amount("EUR:10"), 960 subject: "subject".into(), 961 debtor: magnet_payto( 962 "payto://iban/HU30162000031000163100000000?receiver-name=name", 963 ), 964 value_date: date, 965 status: TxStatus::Completed, 966 }; 967 // Insert 968 assert_eq!( 969 register_tx_in(&mut db, &tx, first, &now) 970 .await 971 .expect("register tx in"), 972 AddIncomingResult::Success { 973 new: true, 974 pending: false, 975 row_id: id, 976 valued_at: date 977 } 978 ); 979 // Idempotent 980 assert_eq!( 981 register_tx_in( 982 &mut db, 983 &TxIn { 984 value_date: later, 985 ..tx.clone() 986 }, 987 first, 988 &now 989 ) 990 .await 991 .expect("register tx in"), 992 AddIncomingResult::Success { 993 new: false, 994 pending: false, 995 row_id: id, 996 valued_at: date 997 } 998 ); 999 // Many 1000 assert_eq!( 1001 register_tx_in( 1002 &mut db, 1003 &TxIn { 1004 code: code + 1, 1005 value_date: later, 1006 ..tx 1007 }, 1008 second, 1009 &now 1010 ) 1011 .await 1012 .expect("register tx in"), 1013 AddIncomingResult::Success { 1014 new: true, 1015 pending: false, 1016 row_id: id + 1, 1017 valued_at: later 1018 } 1019 ); 1020 }; 1021 1022 // Empty db 1023 assert_eq!( 1024 db::revenue_history(&pool, &History::default(), dummy_listen) 1025 .await 1026 .unwrap(), 1027 Vec::new() 1028 ); 1029 assert_eq!( 1030 db::incoming_history(&pool, &History::default(), dummy_listen) 1031 .await 1032 .unwrap(), 1033 Vec::new() 1034 ); 1035 1036 // Regular transaction 1037 routine(&None, &None).await; 1038 1039 // Reserve transaction 1040 routine( 1041 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1042 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1043 ) 1044 .await; 1045 1046 // Kyc transaction 1047 routine( 1048 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1049 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1050 ) 1051 .await; 1052 1053 // History 1054 assert_eq!( 1055 db::revenue_history(&pool, &History::default(), dummy_listen) 1056 .await 1057 .unwrap() 1058 .len(), 1059 6 1060 ); 1061 assert_eq!( 1062 db::incoming_history(&pool, &History::default(), dummy_listen) 1063 .await 1064 .unwrap() 1065 .len(), 1066 4 1067 ); 1068 } 1069 1070 #[tokio::test] 1071 async fn tx_in_admin() { 1072 let (_, pool) = setup().await; 1073 1074 // Empty db 1075 assert_eq!( 1076 db::incoming_history(&pool, &History::default(), dummy_listen) 1077 .await 1078 .unwrap(), 1079 Vec::new() 1080 ); 1081 1082 let now = now_sql_stable_ts(); 1083 let later = now + Span::new().hours(2); 1084 let date = Zoned::now().date(); 1085 let tx = TxInAdmin { 1086 amount: amount("EUR:10"), 1087 subject: "subject".to_owned(), 1088 debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1089 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1090 }; 1091 // Insert 1092 assert_eq!( 1093 register_tx_in_admin(&pool, &tx, &now) 1094 .await 1095 .expect("register tx in"), 1096 AddIncomingResult::Success { 1097 new: true, 1098 pending: false, 1099 row_id: 1, 1100 valued_at: date 1101 } 1102 ); 1103 // Many 1104 assert_eq!( 1105 register_tx_in_admin( 1106 &pool, 1107 &TxInAdmin { 1108 subject: "Other".to_owned(), 1109 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1110 ..tx.clone() 1111 }, 1112 &later 1113 ) 1114 .await 1115 .expect("register tx in"), 1116 AddIncomingResult::Success { 1117 new: true, 1118 pending: false, 1119 row_id: 2, 1120 valued_at: date 1121 } 1122 ); 1123 1124 // History 1125 assert_eq!( 1126 db::incoming_history(&pool, &History::default(), dummy_listen) 1127 .await 1128 .unwrap() 1129 .len(), 1130 2 1131 ); 1132 } 1133 1134 #[tokio::test] 1135 async fn tx_out() { 1136 let (mut db, pool) = setup().await; 1137 1138 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1139 let (id, code) = 1140 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") 1141 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 1142 .fetch_one(&mut *db) 1143 .await 1144 .unwrap(); 1145 let now = now_sql_stable_ts(); 1146 let date = Zoned::now().date(); 1147 let later = date.tomorrow().unwrap(); 1148 let tx = TxOut { 1149 code, 1150 amount: amount("HUF:10"), 1151 subject: "subject".into(), 1152 creditor: magnet_payto( 1153 "payto://iban/HU30162000031000163100000000?receiver-name=name", 1154 ), 1155 value_date: date, 1156 status: TxStatus::Completed, 1157 }; 1158 assert_matches!( 1159 make_transfer( 1160 &pool, 1161 &db::Transfer { 1162 request_uid: HashCode::rand(), 1163 amount: decimal("10"), 1164 exchange_base_url: url("https://exchange.test.com/"), 1165 metadata: None, 1166 wtid: ShortHashCode::rand(), 1167 creditor: tx.creditor.clone() 1168 }, 1169 &now 1170 ) 1171 .await 1172 .unwrap(), 1173 TransferResult::Success { .. } 1174 ); 1175 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code) 1176 .await 1177 .expect("status success"); 1178 1179 // Insert 1180 assert_eq!( 1181 register_tx_out(&mut db, &tx, first, &now) 1182 .await 1183 .expect("register tx out"), 1184 AddOutgoingResult { 1185 result: db::RegisterResult::known, 1186 row_id: id, 1187 } 1188 ); 1189 // Idempotent 1190 assert_eq!( 1191 register_tx_out( 1192 &mut db, 1193 &TxOut { 1194 value_date: later, 1195 ..tx.clone() 1196 }, 1197 first, 1198 &now 1199 ) 1200 .await 1201 .expect("register tx out"), 1202 AddOutgoingResult { 1203 result: db::RegisterResult::idempotent, 1204 row_id: id, 1205 } 1206 ); 1207 // Recovered 1208 assert_eq!( 1209 register_tx_out( 1210 &mut db, 1211 &TxOut { 1212 code: code + 1, 1213 value_date: later, 1214 ..tx.clone() 1215 }, 1216 second, 1217 &now 1218 ) 1219 .await 1220 .expect("register tx out"), 1221 AddOutgoingResult { 1222 result: db::RegisterResult::recovered, 1223 row_id: id + 1, 1224 } 1225 ); 1226 }; 1227 1228 // Empty db 1229 assert_eq!( 1230 db::outgoing_history(&pool, &History::default(), dummy_listen) 1231 .await 1232 .unwrap(), 1233 Vec::new() 1234 ); 1235 1236 // Regular transaction 1237 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1238 1239 // Talerable transaction 1240 routine( 1241 &TxOutKind::Talerable(OutgoingSubject::rand()), 1242 &TxOutKind::Talerable(OutgoingSubject::rand()), 1243 ) 1244 .await; 1245 1246 // Bounced transaction 1247 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1248 1249 // History 1250 assert_eq!( 1251 db::outgoing_history(&pool, &History::default(), dummy_listen) 1252 .await 1253 .unwrap() 1254 .len(), 1255 2 1256 ); 1257 } 1258 1259 #[tokio::test] 1260 async fn tx_out_failure() { 1261 let (mut db, pool) = setup().await; 1262 1263 let now = now_sql_stable_ts(); 1264 1265 // Unknown 1266 assert_eq!( 1267 db::register_tx_out_failure(&mut db, 42, None, &now) 1268 .await 1269 .unwrap(), 1270 OutFailureResult { 1271 initiated_id: None, 1272 new: false 1273 } 1274 ); 1275 assert_eq!( 1276 db::register_tx_out_failure(&mut db, 42, Some(12), &now) 1277 .await 1278 .unwrap(), 1279 OutFailureResult { 1280 initiated_id: None, 1281 new: false 1282 } 1283 ); 1284 1285 // Initiated 1286 let req = db::Transfer { 1287 request_uid: HashCode::rand(), 1288 amount: decimal("10"), 1289 exchange_base_url: url("https://exchange.test.com/"), 1290 metadata: None, 1291 wtid: ShortHashCode::rand(), 1292 creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1293 }; 1294 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1295 assert_eq!( 1296 make_transfer(&pool, &req, &now).await.unwrap(), 1297 TransferResult::Success { 1298 id: 1, 1299 initiated_at: now 1300 } 1301 ); 1302 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) 1303 .await 1304 .expect("status success"); 1305 assert_eq!( 1306 db::register_tx_out_failure(&mut db, 34, None, &now) 1307 .await 1308 .unwrap(), 1309 OutFailureResult { 1310 initiated_id: Some(1), 1311 new: true 1312 } 1313 ); 1314 assert_eq!( 1315 db::register_tx_out_failure(&mut db, 34, None, &now) 1316 .await 1317 .unwrap(), 1318 OutFailureResult { 1319 initiated_id: Some(1), 1320 new: false 1321 } 1322 ); 1323 1324 // Recovered bounce 1325 let tx = TxIn { 1326 code: 12, 1327 amount: amount("HUF:11"), 1328 subject: "malformed transaction".into(), 1329 debtor: payto, 1330 value_date: Zoned::now().date(), 1331 status: TxStatus::Completed, 1332 }; 1333 assert_eq!( 1334 db::register_bounce_tx_in(&mut db, &tx, "no reason", &now) 1335 .await 1336 .unwrap(), 1337 BounceResult { 1338 tx_id: 1, 1339 tx_new: true, 1340 bounce_id: 2, 1341 bounce_new: true 1342 } 1343 ); 1344 assert_eq!( 1345 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1346 .await 1347 .unwrap(), 1348 OutFailureResult { 1349 initiated_id: Some(2), 1350 new: true 1351 } 1352 ); 1353 assert_eq!( 1354 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1355 .await 1356 .unwrap(), 1357 OutFailureResult { 1358 initiated_id: Some(2), 1359 new: false 1360 } 1361 ); 1362 } 1363 1364 #[tokio::test] 1365 async fn transfer() { 1366 let (_, pool) = setup().await; 1367 1368 // Empty db 1369 assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None); 1370 assert_eq!( 1371 db::transfer_page(&pool, &None, &Page::default()) 1372 .await 1373 .unwrap(), 1374 Vec::new() 1375 ); 1376 1377 let req = db::Transfer { 1378 request_uid: HashCode::rand(), 1379 amount: decimal("10"), 1380 exchange_base_url: url("https://exchange.test.com/"), 1381 metadata: None, 1382 wtid: ShortHashCode::rand(), 1383 creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1384 }; 1385 let now = now_sql_stable_ts(); 1386 let later = now + Span::new().hours(2); 1387 // Insert 1388 assert_eq!( 1389 make_transfer(&pool, &req, &now).await.expect("transfer"), 1390 TransferResult::Success { 1391 id: 1, 1392 initiated_at: now 1393 } 1394 ); 1395 // Idempotent 1396 assert_eq!( 1397 make_transfer(&pool, &req, &later).await.expect("transfer"), 1398 TransferResult::Success { 1399 id: 1, 1400 initiated_at: now 1401 } 1402 ); 1403 // Request UID reuse 1404 assert_eq!( 1405 make_transfer( 1406 &pool, 1407 &db::Transfer { 1408 wtid: ShortHashCode::rand(), 1409 ..req.clone() 1410 }, 1411 &now 1412 ) 1413 .await 1414 .expect("transfer"), 1415 TransferResult::RequestUidReuse 1416 ); 1417 // wtid reuse 1418 assert_eq!( 1419 make_transfer( 1420 &pool, 1421 &db::Transfer { 1422 request_uid: HashCode::rand(), 1423 ..req.clone() 1424 }, 1425 &now 1426 ) 1427 .await 1428 .expect("transfer"), 1429 TransferResult::WtidReuse 1430 ); 1431 // Many 1432 assert_eq!( 1433 make_transfer( 1434 &pool, 1435 &db::Transfer { 1436 request_uid: HashCode::rand(), 1437 wtid: ShortHashCode::rand(), 1438 ..req 1439 }, 1440 &later 1441 ) 1442 .await 1443 .expect("transfer"), 1444 TransferResult::Success { 1445 id: 2, 1446 initiated_at: later 1447 } 1448 ); 1449 1450 // Get 1451 assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some()); 1452 assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some()); 1453 assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none()); 1454 assert_eq!( 1455 db::transfer_page(&pool, &None, &Page::default()) 1456 .await 1457 .unwrap() 1458 .len(), 1459 2 1460 ); 1461 } 1462 1463 #[tokio::test] 1464 async fn bounce() { 1465 let (mut db, _) = setup().await; 1466 1467 let amount = amount("HUF:10"); 1468 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1469 let now = now_sql_stable_ts(); 1470 let date = Zoned::now().date(); 1471 1472 // Empty db 1473 assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty()); 1474 1475 // Insert 1476 assert_eq!( 1477 register_tx_in( 1478 &mut db, 1479 &TxIn { 1480 code: 13, 1481 amount, 1482 subject: "subject".into(), 1483 debtor: payto.clone(), 1484 value_date: date, 1485 status: TxStatus::Completed 1486 }, 1487 &None, 1488 &now 1489 ) 1490 .await 1491 .expect("register tx in"), 1492 AddIncomingResult::Success { 1493 new: true, 1494 pending: false, 1495 row_id: 1, 1496 valued_at: date 1497 } 1498 ); 1499 1500 // Bounce 1501 assert_eq!( 1502 register_bounce_tx_in( 1503 &mut db, 1504 &TxIn { 1505 code: 12, 1506 amount, 1507 subject: "subject".into(), 1508 debtor: payto.clone(), 1509 value_date: date, 1510 status: TxStatus::Completed 1511 }, 1512 "good reason", 1513 &now 1514 ) 1515 .await 1516 .expect("bounce"), 1517 BounceResult { 1518 tx_id: 2, 1519 tx_new: true, 1520 bounce_id: 1, 1521 bounce_new: true 1522 } 1523 ); 1524 // Idempotent 1525 assert_eq!( 1526 register_bounce_tx_in( 1527 &mut db, 1528 &TxIn { 1529 code: 12, 1530 amount, 1531 subject: "subject".into(), 1532 debtor: payto.clone(), 1533 value_date: date, 1534 status: TxStatus::Completed 1535 }, 1536 "good reason", 1537 &now 1538 ) 1539 .await 1540 .expect("bounce"), 1541 BounceResult { 1542 tx_id: 2, 1543 tx_new: false, 1544 bounce_id: 1, 1545 bounce_new: false 1546 } 1547 ); 1548 1549 // Bounce registered 1550 assert_eq!( 1551 register_bounce_tx_in( 1552 &mut db, 1553 &TxIn { 1554 code: 13, 1555 amount, 1556 subject: "subject".into(), 1557 debtor: payto.clone(), 1558 value_date: date, 1559 status: TxStatus::Completed 1560 }, 1561 "good reason", 1562 &now 1563 ) 1564 .await 1565 .expect("bounce"), 1566 BounceResult { 1567 tx_id: 1, 1568 tx_new: false, 1569 bounce_id: 2, 1570 bounce_new: true 1571 } 1572 ); 1573 // Idempotent registered 1574 assert_eq!( 1575 register_bounce_tx_in( 1576 &mut db, 1577 &TxIn { 1578 code: 13, 1579 amount, 1580 subject: "subject".into(), 1581 debtor: payto.clone(), 1582 value_date: date, 1583 status: TxStatus::Completed 1584 }, 1585 "good reason", 1586 &now 1587 ) 1588 .await 1589 .expect("bounce"), 1590 BounceResult { 1591 tx_id: 1, 1592 tx_new: false, 1593 bounce_id: 2, 1594 bounce_new: false 1595 } 1596 ); 1597 1598 // Batch 1599 assert_eq!( 1600 db::pending_batch(&mut db, &now).await.unwrap(), 1601 &[ 1602 Initiated { 1603 id: 1, 1604 amount, 1605 subject: "bounce: 12".into(), 1606 creditor: payto.clone() 1607 }, 1608 Initiated { 1609 id: 2, 1610 amount, 1611 subject: "bounce: 13".into(), 1612 creditor: payto 1613 } 1614 ] 1615 ); 1616 } 1617 1618 #[tokio::test] 1619 async fn status() { 1620 let (mut db, _) = setup().await; 1621 1622 // Unknown transfer 1623 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") 1624 .await 1625 .unwrap(); 1626 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1627 .await 1628 .unwrap(); 1629 } 1630 1631 #[tokio::test] 1632 async fn batch() { 1633 let (mut db, pool) = setup().await; 1634 let start = Timestamp::now(); 1635 let magnet_payto = 1636 magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1637 1638 // Empty db 1639 let pendings = db::pending_batch(&mut db, &start) 1640 .await 1641 .expect("pending_batch"); 1642 assert_eq!(pendings.len(), 0); 1643 1644 // Some transfers 1645 for i in 0..3 { 1646 make_transfer( 1647 &pool, 1648 &db::Transfer { 1649 request_uid: HashCode::rand(), 1650 amount: decimal(format!("{}", i + 1)), 1651 exchange_base_url: url("https://exchange.test.com/"), 1652 metadata: None, 1653 wtid: ShortHashCode::rand(), 1654 creditor: magnet_payto.clone(), 1655 }, 1656 &Timestamp::now(), 1657 ) 1658 .await 1659 .expect("transfer"); 1660 } 1661 let pendings = db::pending_batch(&mut db, &start) 1662 .await 1663 .expect("pending_batch"); 1664 assert_eq!(pendings.len(), 3); 1665 1666 // Max 100 txs in batch 1667 for i in 0..100 { 1668 make_transfer( 1669 &pool, 1670 &db::Transfer { 1671 request_uid: HashCode::rand(), 1672 amount: decimal(format!("{}", i + 1)), 1673 exchange_base_url: url("https://exchange.test.com/"), 1674 metadata: None, 1675 wtid: ShortHashCode::rand(), 1676 creditor: magnet_payto.clone(), 1677 }, 1678 &Timestamp::now(), 1679 ) 1680 .await 1681 .expect("transfer"); 1682 } 1683 let pendings = db::pending_batch(&mut db, &start) 1684 .await 1685 .expect("pending_batch"); 1686 assert_eq!(pendings.len(), 100); 1687 1688 // Skip uploaded 1689 for i in 0..=10 { 1690 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1691 .await 1692 .expect("status success"); 1693 } 1694 let pendings = db::pending_batch(&mut db, &start) 1695 .await 1696 .expect("pending_batch"); 1697 assert_eq!(pendings.len(), 93); 1698 1699 // Skip failed 1700 for i in 0..=10 { 1701 db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") 1702 .await 1703 .expect("status failure"); 1704 } 1705 let pendings = db::pending_batch(&mut db, &start) 1706 .await 1707 .expect("pending_batch"); 1708 assert_eq!(pendings.len(), 83); 1709 } 1710 }