db.rs (50213B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::{Timestamp, civil::Date, tz::TimeZone}; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api::{ 30 HashCode, ShortHashCode, 31 params::{History, Page}, 32 prepared::{RegistrationRequest, Unregistration}, 33 revenue::RevenueIncomingBankTransaction, 34 wire::{ 35 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 36 TransferStatus, 37 }, 38 }, 39 config::Config, 40 db::IncomingType, 41 types::{ 42 amount::{Amount, Decimal}, 43 payto::PaytoImpl as _, 44 }, 45 }; 46 use tokio::sync::watch::{Receiver, Sender}; 47 use url::Url; 48 49 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURR, magnet_api::types::TxStatus}; 50 51 const SCHEMA: &str = "magnet_bank"; 52 53 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 54 let db = parse_db_cfg(cfg)?; 55 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 56 Ok(pool) 57 } 58 59 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 60 let db_cfg = parse_db_cfg(cfg)?; 61 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 62 let mut db = pool.acquire().await?; 63 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?; 64 Ok(pool) 65 } 66 67 pub async fn notification_listener( 68 pool: PgPool, 69 in_channel: Sender<i64>, 70 taler_in_channel: Sender<i64>, 71 out_channel: Sender<i64>, 72 taler_out_channel: Sender<i64>, 73 ) { 74 taler_api::notification::notification_listener!(&pool, 75 "tx_in" => (row_id: i64) { 76 in_channel.send_replace(row_id); 77 }, 78 "taler_in" => (row_id: i64) { 79 taler_in_channel.send_replace(row_id); 80 }, 81 "tx_out" => (row_id: i64) { 82 out_channel.send_replace(row_id); 83 }, 84 "taler_out" => (row_id: i64) { 85 taler_out_channel.send_replace(row_id); 86 } 87 ) 88 } 89 90 #[derive(Debug, Clone)] 91 pub struct TxIn { 92 pub code: u64, 93 pub amount: Amount, 94 pub subject: Box<str>, 95 pub debtor: FullHuPayto, 96 pub value_date: Date, 97 pub status: TxStatus, 98 } 99 100 impl Display for TxIn { 101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 102 let Self { 103 code, 104 amount, 105 subject, 106 debtor, 107 value_date, 108 status, 109 } = self; 110 write!( 111 f, 112 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 113 debtor.bban(), 114 debtor.name 115 ) 116 } 117 } 118 119 #[derive(Debug, Clone)] 120 pub struct TxOut { 121 pub code: u64, 122 pub amount: Amount, 123 pub subject: Box<str>, 124 pub creditor: FullHuPayto, 125 pub value_date: Date, 126 pub status: TxStatus, 127 } 128 129 impl Display for TxOut { 130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 131 let Self { 132 code, 133 amount, 134 subject, 135 creditor, 136 value_date, 137 status, 138 } = self; 139 write!( 140 f, 141 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 142 creditor.bban(), 143 &creditor.name 144 ) 145 } 146 } 147 148 #[derive(Debug, PartialEq, Eq)] 149 pub struct Initiated { 150 pub id: u64, 151 pub amount: Amount, 152 pub subject: Box<str>, 153 pub creditor: FullHuPayto, 154 } 155 156 impl Display for Initiated { 157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 158 let Self { 159 id, 160 amount, 161 subject, 162 creditor, 163 } = self; 164 write!( 165 f, 166 "{id} {amount} ({} {}) '{subject}'", 167 creditor.bban(), 168 &creditor.name 169 ) 170 } 171 } 172 173 #[derive(Debug, Clone)] 174 pub struct TxInAdmin { 175 pub amount: Amount, 176 pub subject: String, 177 pub debtor: FullHuPayto, 178 pub metadata: IncomingSubject, 179 } 180 181 /// Lock the database for worker execution 182 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 183 sqlx::query("SELECT pg_try_advisory_lock(42)") 184 .try_map(|r: PgRow| r.try_get(0)) 185 .fetch_one(e) 186 .await 187 } 188 189 #[derive(Debug, PartialEq, Eq)] 190 pub enum AddIncomingResult { 191 Success { 192 new: bool, 193 pending: bool, 194 row_id: u64, 195 valued_at: Date, 196 }, 197 ReservePubReuse, 198 UnknownMapping, 199 MappingReuse, 200 } 201 202 pub async fn register_tx_in_admin( 203 db: &PgPool, 204 tx: &TxInAdmin, 205 now: &Timestamp, 206 ) -> sqlx::Result<AddIncomingResult> { 207 serialized!( 208 sqlx::query( 209 " 210 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 211 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) 212 ", 213 ) 214 .bind(tx.amount) 215 .bind(&tx.subject) 216 .bind(tx.debtor.iban()) 217 .bind(&tx.debtor.name) 218 .bind_date(&now.to_zoned(TimeZone::UTC).date()) 219 .bind(tx.metadata.ty()) 220 .bind(tx.metadata.key()) 221 .try_map(|r: PgRow| { 222 Ok(if r.try_get_flag(0)? { 223 AddIncomingResult::ReservePubReuse 224 } else if r.try_get_flag(1)? { 225 AddIncomingResult::MappingReuse 226 } else if r.try_get_flag(2)? { 227 AddIncomingResult::UnknownMapping 228 } else { 229 AddIncomingResult::Success { 230 row_id: r.try_get_u64(3)?, 231 valued_at: r.try_get_date(4)?, 232 new: r.try_get(5)?, 233 pending: r.try_get(6)? 234 } 235 }) 236 }) 237 .fetch_one(db) 238 ) 239 } 240 241 pub async fn register_tx_in( 242 db: &mut PgConnection, 243 tx: &TxIn, 244 subject: &Option<IncomingSubject>, 245 now: &Timestamp, 246 ) -> sqlx::Result<AddIncomingResult> { 247 serialized!( 248 sqlx::query( 249 " 250 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 251 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) 252 ", 253 ) 254 .bind(tx.code as i64) 255 .bind(tx.amount) 256 .bind(&tx.subject) 257 .bind(tx.debtor.iban()) 258 .bind(&tx.debtor.name) 259 .bind_date(&tx.value_date) 260 .bind(subject.as_ref().map(|it| it.ty())) 261 .bind(subject.as_ref().map(|it| it.key())) 262 .bind_timestamp(now) 263 .try_map(|r: PgRow| { 264 Ok(if r.try_get_flag(0)? { 265 AddIncomingResult::ReservePubReuse 266 } else if r.try_get_flag(1)? { 267 AddIncomingResult::MappingReuse 268 } else if r.try_get_flag(2)? { 269 AddIncomingResult::UnknownMapping 270 } else { 271 AddIncomingResult::Success { 272 row_id: r.try_get_u64(3)?, 273 valued_at: r.try_get_date(4)?, 274 new: r.try_get(5)?, 275 pending: r.try_get(6)? 276 } 277 }) 278 }) 279 .fetch_one(&mut *db) 280 ) 281 } 282 283 #[derive(Debug)] 284 pub enum TxOutKind { 285 Simple, 286 Bounce(u32), 287 Talerable(OutgoingSubject), 288 } 289 290 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 291 #[allow(non_camel_case_types)] 292 #[sqlx(type_name = "register_result")] 293 pub enum RegisterResult { 294 /// Already registered 295 idempotent, 296 /// Initiated transaction 297 known, 298 /// Recovered unknown outgoing transaction 299 recovered, 300 } 301 302 #[derive(Debug, PartialEq, Eq)] 303 pub struct AddOutgoingResult { 304 pub result: RegisterResult, 305 pub row_id: u64, 306 } 307 308 pub async fn register_tx_out( 309 db: &mut PgConnection, 310 tx: &TxOut, 311 kind: &TxOutKind, 312 now: &Timestamp, 313 ) -> sqlx::Result<AddOutgoingResult> { 314 serialized!({ 315 let query = sqlx::query( 316 " 317 SELECT out_result, out_tx_row_id 318 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 319 ", 320 ) 321 .bind(tx.code as i64) 322 .bind(tx.amount) 323 .bind(&tx.subject) 324 .bind(tx.creditor.iban()) 325 .bind(&tx.creditor.name) 326 .bind_date(&tx.value_date); 327 let query = match kind { 328 TxOutKind::Simple => query 329 .bind(None::<&[u8]>) 330 .bind(None::<&str>) 331 .bind(None::<&str>) 332 .bind(None::<i64>), 333 TxOutKind::Bounce(bounced) => query 334 .bind(None::<&[u8]>) 335 .bind(None::<&str>) 336 .bind(None::<&str>) 337 .bind(*bounced as i64), 338 TxOutKind::Talerable(subject) => query 339 .bind(&subject.wtid) 340 .bind(subject.exchange_base_url.as_str()) 341 .bind(&subject.metadata) 342 .bind(None::<i64>), 343 }; 344 query 345 .bind_timestamp(now) 346 .try_map(|r: PgRow| { 347 Ok(AddOutgoingResult { 348 result: r.try_get(0)?, 349 row_id: r.try_get_u64(1)?, 350 }) 351 }) 352 .fetch_one(&mut *db) 353 }) 354 } 355 356 #[derive(Debug, PartialEq, Eq)] 357 pub struct OutFailureResult { 358 pub initiated_id: Option<u64>, 359 pub new: bool, 360 } 361 362 pub async fn register_tx_out_failure( 363 db: &mut PgConnection, 364 code: u64, 365 bounced: Option<u32>, 366 now: &Timestamp, 367 ) -> sqlx::Result<OutFailureResult> { 368 serialized!( 369 sqlx::query( 370 " 371 SELECT out_new, out_initiated_id 372 FROM register_tx_out_failure($1, $2, $3) 373 ", 374 ) 375 .bind(code as i64) 376 .bind(bounced.map(|i| i as i32)) 377 .bind_timestamp(now) 378 .try_map(|r: PgRow| { 379 Ok(OutFailureResult { 380 new: r.try_get(0)?, 381 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), 382 }) 383 }) 384 .fetch_one(&mut *db) 385 ) 386 } 387 388 #[derive(Debug, PartialEq, Eq)] 389 pub enum TransferResult { 390 Success { id: u64, initiated_at: Timestamp }, 391 RequestUidReuse, 392 WtidReuse, 393 } 394 395 #[derive(Debug, Clone)] 396 pub struct Transfer { 397 pub request_uid: HashCode, 398 pub amount: Decimal, 399 pub exchange_base_url: Url, 400 pub metadata: Option<CompactString>, 401 pub wtid: ShortHashCode, 402 pub creditor: FullHuPayto, 403 } 404 405 pub async fn make_transfer( 406 db: &PgPool, 407 tx: &Transfer, 408 now: &Timestamp, 409 ) -> sqlx::Result<TransferResult> { 410 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 411 serialized!( 412 sqlx::query( 413 " 414 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 415 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 416 ", 417 ) 418 .bind(&tx.request_uid) 419 .bind(&tx.wtid) 420 .bind(&subject) 421 .bind(tx.amount) 422 .bind(tx.exchange_base_url.as_str()) 423 .bind(&tx.metadata) 424 .bind(tx.creditor.iban()) 425 .bind(&tx.creditor.name) 426 .bind_timestamp(now) 427 .try_map(|r: PgRow| { 428 Ok(if r.try_get_flag(0)? { 429 TransferResult::RequestUidReuse 430 } else if r.try_get_flag(1)? { 431 TransferResult::WtidReuse 432 } else { 433 TransferResult::Success { 434 id: r.try_get_u64(2)?, 435 initiated_at: r.try_get_timestamp(3)?, 436 } 437 }) 438 }) 439 .fetch_one(db) 440 ) 441 } 442 443 #[derive(Debug, PartialEq, Eq)] 444 pub struct BounceResult { 445 pub tx_id: u64, 446 pub tx_new: bool, 447 pub bounce_id: u64, 448 pub bounce_new: bool, 449 } 450 451 pub async fn register_bounce_tx_in( 452 db: &mut PgConnection, 453 tx: &TxIn, 454 reason: &str, 455 now: &Timestamp, 456 ) -> sqlx::Result<BounceResult> { 457 serialized!( 458 sqlx::query( 459 " 460 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new 461 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8) 462 ", 463 ) 464 .bind(tx.code as i64) 465 .bind(tx.amount) 466 .bind(&tx.subject) 467 .bind(tx.debtor.iban()) 468 .bind(&tx.debtor.name) 469 .bind_date(&tx.value_date) 470 .bind(reason) 471 .bind_timestamp(now) 472 .try_map(|r: PgRow| { 473 Ok(BounceResult { 474 tx_id: r.try_get_u64(0)?, 475 tx_new: r.try_get(1)?, 476 bounce_id: r.try_get_u64(2)?, 477 bounce_new: r.try_get(3)?, 478 }) 479 }) 480 .fetch_one(&mut *db) 481 ) 482 } 483 484 pub async fn transfer_page( 485 db: &PgPool, 486 status: &Option<TransferState>, 487 params: &Page, 488 ) -> sqlx::Result<Vec<TransferListStatus>> { 489 page( 490 db, 491 params, 492 "initiated_id", 493 || { 494 let mut builder = QueryBuilder::new( 495 " 496 SELECT 497 initiated_id, 498 status, 499 amount, 500 credit_account, 501 credit_name, 502 initiated_at 503 FROM transfer 504 JOIN initiated USING (initiated_id) 505 WHERE 506 ", 507 ); 508 if let Some(status) = status { 509 builder.push(" status = ").push_bind(status).push(" AND "); 510 } 511 builder 512 }, 513 |r: PgRow| { 514 Ok(TransferListStatus { 515 row_id: r.try_get_u64(0)?, 516 status: r.try_get(1)?, 517 amount: r.try_get_amount(2, &CURR)?, 518 credit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 519 timestamp: r.try_get_timestamp(5)?.into(), 520 }) 521 }, 522 ) 523 .await 524 } 525 526 pub async fn outgoing_history( 527 db: &PgPool, 528 params: &History, 529 listen: impl FnOnce() -> Receiver<i64>, 530 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 531 history( 532 db, 533 "tx_out_id", 534 params, 535 listen, 536 || { 537 QueryBuilder::new( 538 " 539 SELECT 540 tx_out_id, 541 amount, 542 credit_account, 543 credit_name, 544 valued_at, 545 exchange_base_url, 546 metadata, 547 wtid 548 FROM taler_out 549 JOIN tx_out USING (tx_out_id) 550 WHERE 551 ", 552 ) 553 }, 554 |r: PgRow| { 555 Ok(OutgoingBankTransaction { 556 row_id: r.try_get_u64(0)?, 557 amount: r.try_get_amount(1, &CURR)?, 558 debit_fee: None, 559 credit_account: r.try_get_iban(2)?.as_full_uri(r.try_get(3)?), 560 date: r.try_get_timestamp(4)?.into(), 561 exchange_base_url: r.try_get_url(5)?, 562 metadata: r.try_get(6)?, 563 wtid: r.try_get(7)?, 564 }) 565 }, 566 ) 567 .await 568 } 569 570 pub async fn incoming_history( 571 db: &PgPool, 572 params: &History, 573 listen: impl FnOnce() -> Receiver<i64>, 574 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 575 history( 576 db, 577 "tx_in_id", 578 params, 579 listen, 580 || { 581 QueryBuilder::new( 582 " 583 SELECT 584 type, 585 tx_in_id, 586 amount, 587 debit_account, 588 debit_name, 589 valued_at, 590 metadata, 591 authorization_pub, 592 authorization_sig 593 FROM taler_in 594 JOIN tx_in USING (tx_in_id) 595 WHERE 596 ", 597 ) 598 }, 599 |r: PgRow| { 600 Ok(match r.try_get(0)? { 601 IncomingType::reserve => IncomingBankTransaction::Reserve { 602 row_id: r.try_get_u64(1)?, 603 amount: r.try_get_amount(2, &CURR)?, 604 credit_fee: None, 605 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 606 date: r.try_get_timestamp(5)?.into(), 607 reserve_pub: r.try_get(6)?, 608 authorization_pub: r.try_get(7)?, 609 authorization_sig: r.try_get(8)?, 610 }, 611 IncomingType::kyc => IncomingBankTransaction::Kyc { 612 row_id: r.try_get_u64(1)?, 613 amount: r.try_get_amount(2, &CURR)?, 614 credit_fee: None, 615 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 616 date: r.try_get_timestamp(5)?.into(), 617 account_pub: r.try_get(6)?, 618 authorization_pub: r.try_get(7)?, 619 authorization_sig: r.try_get(8)?, 620 }, 621 IncomingType::map => unimplemented!("MAP are never listed in the history"), 622 }) 623 }, 624 ) 625 .await 626 } 627 628 pub async fn revenue_history( 629 db: &PgPool, 630 params: &History, 631 listen: impl FnOnce() -> Receiver<i64>, 632 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 633 history( 634 db, 635 "tx_in_id", 636 params, 637 listen, 638 || { 639 QueryBuilder::new( 640 " 641 SELECT 642 tx_in_id, 643 valued_at, 644 amount, 645 debit_account, 646 debit_name, 647 subject 648 FROM tx_in 649 WHERE 650 ", 651 ) 652 }, 653 |r: PgRow| { 654 Ok(RevenueIncomingBankTransaction { 655 row_id: r.try_get_u64(0)?, 656 date: r.try_get_timestamp(1)?.into(), 657 amount: r.try_get_amount(2, &CURR)?, 658 credit_fee: None, 659 debit_account: r.try_get_iban(3)?.as_full_uri(r.try_get(4)?), 660 subject: r.try_get(5)?, 661 }) 662 }, 663 ) 664 .await 665 } 666 667 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> { 668 serialized!( 669 sqlx::query( 670 " 671 SELECT 672 status, 673 status_msg, 674 amount, 675 exchange_base_url, 676 metadata, 677 wtid, 678 credit_account, 679 credit_name, 680 initiated_at 681 FROM transfer 682 JOIN initiated USING (initiated_id) 683 WHERE initiated_id = $1 684 ", 685 ) 686 .bind(id as i64) 687 .try_map(|r: PgRow| { 688 Ok(TransferStatus { 689 status: r.try_get(0)?, 690 status_msg: r.try_get(1)?, 691 amount: r.try_get_amount(2, &CURR)?, 692 origin_exchange_url: r.try_get(3)?, 693 metadata: r.try_get(4)?, 694 wtid: r.try_get(5)?, 695 credit_account: r.try_get_iban(6)?.as_full_uri(r.try_get(7)?), 696 timestamp: r.try_get_timestamp(8)?.into(), 697 }) 698 }) 699 .fetch_optional(db) 700 ) 701 } 702 703 /** Get a batch of pending initiated transactions not attempted since [start] */ 704 pub async fn pending_batch( 705 db: &mut PgConnection, 706 start: &Timestamp, 707 ) -> sqlx::Result<Vec<Initiated>> { 708 serialized!( 709 sqlx::query( 710 " 711 SELECT initiated_id, amount, subject, credit_account, credit_name 712 FROM initiated 713 WHERE magnet_code IS NULL 714 AND status='pending' 715 AND (last_submitted IS NULL OR last_submitted < $1) 716 LIMIT 100 717 ", 718 ) 719 .bind_timestamp(start) 720 .try_map(|r: PgRow| { 721 Ok(Initiated { 722 id: r.try_get_u64(0)?, 723 amount: r.try_get_amount(1, &CURR)?, 724 subject: r.try_get(2)?, 725 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 726 }) 727 }) 728 .fetch_all(&mut *db) 729 ) 730 } 731 732 /** Get an initiated transaction matching the given magnet [code] */ 733 pub async fn initiated_by_code( 734 db: &mut PgConnection, 735 code: u64, 736 ) -> sqlx::Result<Option<Initiated>> { 737 serialized!( 738 sqlx::query( 739 " 740 SELECT initiated_id, amount, subject, credit_account, credit_name 741 FROM initiated 742 WHERE magnet_code IS $1 743 ", 744 ) 745 .bind(code as i64) 746 .try_map(|r: PgRow| { 747 Ok(Initiated { 748 id: r.try_get_u64(0)?, 749 amount: r.try_get_amount(1, &CURR)?, 750 subject: r.try_get(2)?, 751 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 752 }) 753 }) 754 .fetch_optional(&mut *db) 755 ) 756 } 757 758 /** Update status of a successful submitted initiated transaction */ 759 pub async fn initiated_submit_success( 760 db: &mut PgConnection, 761 id: u64, 762 timestamp: &Timestamp, 763 magnet_code: u64, 764 ) -> sqlx::Result<()> { 765 serialized!( 766 sqlx::query( 767 " 768 UPDATE initiated 769 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 770 WHERE initiated_id=$3 771 " 772 ).bind_timestamp(timestamp) 773 .bind(magnet_code as i64) 774 .bind(id as i64) 775 .execute(&mut *db) 776 )?; 777 Ok(()) 778 } 779 780 /** Update status of a permanently failed initiated transaction */ 781 pub async fn initiated_submit_permanent_failure( 782 db: &mut PgConnection, 783 id: u64, 784 timestamp: &Timestamp, 785 msg: &str, 786 ) -> sqlx::Result<()> { 787 serialized!( 788 sqlx::query( 789 " 790 UPDATE initiated 791 SET status='permanent_failure', status_msg=$2 792 WHERE initiated_id=$3 793 ", 794 ) 795 .bind_timestamp(timestamp) 796 .bind(msg) 797 .bind(id as i64) 798 .execute(&mut *db) 799 )?; 800 Ok(()) 801 } 802 803 /** Check if an initiated transaction exist for a magnet code */ 804 pub async fn initiated_exists_for_code( 805 db: &mut PgConnection, 806 code: u64, 807 ) -> sqlx::Result<Option<u64>> { 808 serialized!( 809 sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") 810 .bind(code as i64) 811 .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) 812 .fetch_optional(&mut *db) 813 ) 814 } 815 816 /** Get JSON value from KV table */ 817 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 818 db: &mut PgConnection, 819 key: &str, 820 ) -> sqlx::Result<Option<T>> { 821 serialized!( 822 sqlx::query("SELECT value FROM kv WHERE key=$1") 823 .bind(key) 824 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 825 .fetch_optional(&mut *db) 826 ) 827 } 828 829 /** Set JSON value in KV table */ 830 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 831 serialized!( 832 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 833 .bind(key) 834 .bind(sqlx::types::Json(value)) 835 .execute(&mut *db) 836 )?; 837 Ok(()) 838 } 839 840 pub enum RegistrationResult { 841 Success, 842 ReservePubReuse, 843 } 844 845 pub async fn transfer_register( 846 db: &PgPool, 847 req: &RegistrationRequest, 848 ) -> sqlx::Result<RegistrationResult> { 849 let ty: IncomingType = req.r#type.into(); 850 serialized!( 851 sqlx::query( 852 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 853 ) 854 .bind(ty) 855 .bind(&req.account_pub) 856 .bind(&req.authorization_pub) 857 .bind(&req.authorization_sig) 858 .bind(req.recurrent) 859 .bind_timestamp(&Timestamp::now()) 860 .try_map(|r: PgRow| { 861 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 862 RegistrationResult::ReservePubReuse 863 } else { 864 RegistrationResult::Success 865 }) 866 }) 867 .fetch_one(db) 868 ) 869 } 870 871 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 872 serialized!( 873 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") 874 .bind(&req.authorization_pub) 875 .bind_timestamp(&Timestamp::now()) 876 .try_map(|r: PgRow| r.try_get_flag("out_found")) 877 .fetch_one(db) 878 ) 879 } 880 881 #[cfg(test)] 882 mod test { 883 use jiff::{Span, Timestamp, Zoned}; 884 use serde_json::json; 885 use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; 886 use taler_api::{ 887 db::TypeHelper, 888 notification::dummy_listen, 889 subject::{IncomingSubject, OutgoingSubject}, 890 }; 891 use taler_common::{ 892 api::{ 893 EddsaPublicKey, HashCode, ShortHashCode, 894 params::{History, Page}, 895 }, 896 types::{ 897 amount::{amount, decimal}, 898 url, 899 utils::now_sql_stable_ts, 900 }, 901 }; 902 903 use super::TxInAdmin; 904 use crate::{ 905 constants::CONFIG_SOURCE, 906 db::{ 907 self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult, 908 TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, 909 register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out, 910 }, 911 magnet_api::types::TxStatus, 912 magnet_payto, 913 }; 914 915 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 916 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 917 } 918 919 #[tokio::test] 920 async fn kv() { 921 let (mut db, _) = setup().await; 922 923 let value = json!({ 924 "name": "Mr Smith", 925 "no way": 32 926 }); 927 928 assert_eq!( 929 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 930 None 931 ); 932 kv_set(&mut db, "value", &value).await.unwrap(); 933 kv_set(&mut db, "value", &value).await.unwrap(); 934 assert_eq!( 935 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 936 Some(value) 937 ); 938 } 939 940 #[tokio::test] 941 async fn tx_in() { 942 let (mut db, pool) = setup().await; 943 944 let mut routine = async |first: &Option<IncomingSubject>, 945 second: &Option<IncomingSubject>| { 946 let (id, code) = 947 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") 948 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 949 .fetch_one(&mut *db) 950 .await 951 .unwrap(); 952 let now = now_sql_stable_ts(); 953 let date = Zoned::now().date(); 954 let later = date.tomorrow().unwrap(); 955 let tx = TxIn { 956 code, 957 amount: amount("EUR:10"), 958 subject: "subject".into(), 959 debtor: magnet_payto( 960 "payto://iban/HU30162000031000163100000000?receiver-name=name", 961 ), 962 value_date: date, 963 status: TxStatus::Completed, 964 }; 965 // Insert 966 assert_eq!( 967 register_tx_in(&mut db, &tx, first, &now) 968 .await 969 .expect("register tx in"), 970 AddIncomingResult::Success { 971 new: true, 972 pending: false, 973 row_id: id, 974 valued_at: date 975 } 976 ); 977 // Idempotent 978 assert_eq!( 979 register_tx_in( 980 &mut db, 981 &TxIn { 982 value_date: later, 983 ..tx.clone() 984 }, 985 first, 986 &now 987 ) 988 .await 989 .expect("register tx in"), 990 AddIncomingResult::Success { 991 new: false, 992 pending: false, 993 row_id: id, 994 valued_at: date 995 } 996 ); 997 // Many 998 assert_eq!( 999 register_tx_in( 1000 &mut db, 1001 &TxIn { 1002 code: code + 1, 1003 value_date: later, 1004 ..tx 1005 }, 1006 second, 1007 &now 1008 ) 1009 .await 1010 .expect("register tx in"), 1011 AddIncomingResult::Success { 1012 new: true, 1013 pending: false, 1014 row_id: id + 1, 1015 valued_at: later 1016 } 1017 ); 1018 }; 1019 1020 // Empty db 1021 assert_eq!( 1022 db::revenue_history(&pool, &History::default(), dummy_listen) 1023 .await 1024 .unwrap(), 1025 Vec::new() 1026 ); 1027 assert_eq!( 1028 db::incoming_history(&pool, &History::default(), dummy_listen) 1029 .await 1030 .unwrap(), 1031 Vec::new() 1032 ); 1033 1034 // Regular transaction 1035 routine(&None, &None).await; 1036 1037 // Reserve transaction 1038 routine( 1039 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1040 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1041 ) 1042 .await; 1043 1044 // Kyc transaction 1045 routine( 1046 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1047 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1048 ) 1049 .await; 1050 1051 // History 1052 assert_eq!( 1053 db::revenue_history(&pool, &History::default(), dummy_listen) 1054 .await 1055 .unwrap() 1056 .len(), 1057 6 1058 ); 1059 assert_eq!( 1060 db::incoming_history(&pool, &History::default(), dummy_listen) 1061 .await 1062 .unwrap() 1063 .len(), 1064 4 1065 ); 1066 } 1067 1068 #[tokio::test] 1069 async fn tx_in_admin() { 1070 let (_, pool) = setup().await; 1071 1072 // Empty db 1073 assert_eq!( 1074 db::incoming_history(&pool, &History::default(), dummy_listen) 1075 .await 1076 .unwrap(), 1077 Vec::new() 1078 ); 1079 1080 let now = now_sql_stable_ts(); 1081 let later = now + Span::new().hours(2); 1082 let date = Zoned::now().date(); 1083 let tx = TxInAdmin { 1084 amount: amount("EUR:10"), 1085 subject: "subject".to_owned(), 1086 debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1087 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1088 }; 1089 // Insert 1090 assert_eq!( 1091 register_tx_in_admin(&pool, &tx, &now) 1092 .await 1093 .expect("register tx in"), 1094 AddIncomingResult::Success { 1095 new: true, 1096 pending: false, 1097 row_id: 1, 1098 valued_at: date 1099 } 1100 ); 1101 // Many 1102 assert_eq!( 1103 register_tx_in_admin( 1104 &pool, 1105 &TxInAdmin { 1106 subject: "Other".to_owned(), 1107 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1108 ..tx.clone() 1109 }, 1110 &later 1111 ) 1112 .await 1113 .expect("register tx in"), 1114 AddIncomingResult::Success { 1115 new: true, 1116 pending: false, 1117 row_id: 2, 1118 valued_at: date 1119 } 1120 ); 1121 1122 // History 1123 assert_eq!( 1124 db::incoming_history(&pool, &History::default(), dummy_listen) 1125 .await 1126 .unwrap() 1127 .len(), 1128 2 1129 ); 1130 } 1131 1132 #[tokio::test] 1133 async fn tx_out() { 1134 let (mut db, pool) = setup().await; 1135 1136 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1137 let (id, code) = 1138 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") 1139 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 1140 .fetch_one(&mut *db) 1141 .await 1142 .unwrap(); 1143 let now = now_sql_stable_ts(); 1144 let date = Zoned::now().date(); 1145 let later = date.tomorrow().unwrap(); 1146 let tx = TxOut { 1147 code, 1148 amount: amount("HUF:10"), 1149 subject: "subject".into(), 1150 creditor: magnet_payto( 1151 "payto://iban/HU30162000031000163100000000?receiver-name=name", 1152 ), 1153 value_date: date, 1154 status: TxStatus::Completed, 1155 }; 1156 assert!(matches!( 1157 make_transfer( 1158 &pool, 1159 &db::Transfer { 1160 request_uid: HashCode::rand(), 1161 amount: decimal("10"), 1162 exchange_base_url: url("https://exchange.test.com/"), 1163 metadata: None, 1164 wtid: ShortHashCode::rand(), 1165 creditor: tx.creditor.clone() 1166 }, 1167 &now 1168 ) 1169 .await 1170 .unwrap(), 1171 TransferResult::Success { .. } 1172 )); 1173 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code) 1174 .await 1175 .expect("status success"); 1176 1177 // Insert 1178 assert_eq!( 1179 register_tx_out(&mut db, &tx, first, &now) 1180 .await 1181 .expect("register tx out"), 1182 AddOutgoingResult { 1183 result: db::RegisterResult::known, 1184 row_id: id, 1185 } 1186 ); 1187 // Idempotent 1188 assert_eq!( 1189 register_tx_out( 1190 &mut db, 1191 &TxOut { 1192 value_date: later, 1193 ..tx.clone() 1194 }, 1195 first, 1196 &now 1197 ) 1198 .await 1199 .expect("register tx out"), 1200 AddOutgoingResult { 1201 result: db::RegisterResult::idempotent, 1202 row_id: id, 1203 } 1204 ); 1205 // Recovered 1206 assert_eq!( 1207 register_tx_out( 1208 &mut db, 1209 &TxOut { 1210 code: code + 1, 1211 value_date: later, 1212 ..tx.clone() 1213 }, 1214 second, 1215 &now 1216 ) 1217 .await 1218 .expect("register tx out"), 1219 AddOutgoingResult { 1220 result: db::RegisterResult::recovered, 1221 row_id: id + 1, 1222 } 1223 ); 1224 }; 1225 1226 // Empty db 1227 assert_eq!( 1228 db::outgoing_history(&pool, &History::default(), dummy_listen) 1229 .await 1230 .unwrap(), 1231 Vec::new() 1232 ); 1233 1234 // Regular transaction 1235 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1236 1237 // Talerable transaction 1238 routine( 1239 &TxOutKind::Talerable(OutgoingSubject::rand()), 1240 &TxOutKind::Talerable(OutgoingSubject::rand()), 1241 ) 1242 .await; 1243 1244 // Bounced transaction 1245 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1246 1247 // History 1248 assert_eq!( 1249 db::outgoing_history(&pool, &History::default(), dummy_listen) 1250 .await 1251 .unwrap() 1252 .len(), 1253 2 1254 ); 1255 } 1256 1257 #[tokio::test] 1258 async fn tx_out_failure() { 1259 let (mut db, pool) = setup().await; 1260 1261 let now = now_sql_stable_ts(); 1262 1263 // Unknown 1264 assert_eq!( 1265 db::register_tx_out_failure(&mut db, 42, None, &now) 1266 .await 1267 .unwrap(), 1268 OutFailureResult { 1269 initiated_id: None, 1270 new: false 1271 } 1272 ); 1273 assert_eq!( 1274 db::register_tx_out_failure(&mut db, 42, Some(12), &now) 1275 .await 1276 .unwrap(), 1277 OutFailureResult { 1278 initiated_id: None, 1279 new: false 1280 } 1281 ); 1282 1283 // Initiated 1284 let req = db::Transfer { 1285 request_uid: HashCode::rand(), 1286 amount: decimal("10"), 1287 exchange_base_url: url("https://exchange.test.com/"), 1288 metadata: None, 1289 wtid: ShortHashCode::rand(), 1290 creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1291 }; 1292 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1293 assert_eq!( 1294 make_transfer(&pool, &req, &now).await.unwrap(), 1295 TransferResult::Success { 1296 id: 1, 1297 initiated_at: now 1298 } 1299 ); 1300 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) 1301 .await 1302 .expect("status success"); 1303 assert_eq!( 1304 db::register_tx_out_failure(&mut db, 34, None, &now) 1305 .await 1306 .unwrap(), 1307 OutFailureResult { 1308 initiated_id: Some(1), 1309 new: true 1310 } 1311 ); 1312 assert_eq!( 1313 db::register_tx_out_failure(&mut db, 34, None, &now) 1314 .await 1315 .unwrap(), 1316 OutFailureResult { 1317 initiated_id: Some(1), 1318 new: false 1319 } 1320 ); 1321 1322 // Recovered bounce 1323 let tx = TxIn { 1324 code: 12, 1325 amount: amount("HUF:11"), 1326 subject: "malformed transaction".into(), 1327 debtor: payto, 1328 value_date: Zoned::now().date(), 1329 status: TxStatus::Completed, 1330 }; 1331 assert_eq!( 1332 db::register_bounce_tx_in(&mut db, &tx, "no reason", &now) 1333 .await 1334 .unwrap(), 1335 BounceResult { 1336 tx_id: 1, 1337 tx_new: true, 1338 bounce_id: 2, 1339 bounce_new: true 1340 } 1341 ); 1342 assert_eq!( 1343 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1344 .await 1345 .unwrap(), 1346 OutFailureResult { 1347 initiated_id: Some(2), 1348 new: true 1349 } 1350 ); 1351 assert_eq!( 1352 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1353 .await 1354 .unwrap(), 1355 OutFailureResult { 1356 initiated_id: Some(2), 1357 new: false 1358 } 1359 ); 1360 } 1361 1362 #[tokio::test] 1363 async fn transfer() { 1364 let (_, pool) = setup().await; 1365 1366 // Empty db 1367 assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None); 1368 assert_eq!( 1369 db::transfer_page(&pool, &None, &Page::default()) 1370 .await 1371 .unwrap(), 1372 Vec::new() 1373 ); 1374 1375 let req = db::Transfer { 1376 request_uid: HashCode::rand(), 1377 amount: decimal("10"), 1378 exchange_base_url: url("https://exchange.test.com/"), 1379 metadata: None, 1380 wtid: ShortHashCode::rand(), 1381 creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1382 }; 1383 let now = now_sql_stable_ts(); 1384 let later = now + Span::new().hours(2); 1385 // Insert 1386 assert_eq!( 1387 make_transfer(&pool, &req, &now).await.expect("transfer"), 1388 TransferResult::Success { 1389 id: 1, 1390 initiated_at: now 1391 } 1392 ); 1393 // Idempotent 1394 assert_eq!( 1395 make_transfer(&pool, &req, &later).await.expect("transfer"), 1396 TransferResult::Success { 1397 id: 1, 1398 initiated_at: now 1399 } 1400 ); 1401 // Request UID reuse 1402 assert_eq!( 1403 make_transfer( 1404 &pool, 1405 &db::Transfer { 1406 wtid: ShortHashCode::rand(), 1407 ..req.clone() 1408 }, 1409 &now 1410 ) 1411 .await 1412 .expect("transfer"), 1413 TransferResult::RequestUidReuse 1414 ); 1415 // wtid reuse 1416 assert_eq!( 1417 make_transfer( 1418 &pool, 1419 &db::Transfer { 1420 request_uid: HashCode::rand(), 1421 ..req.clone() 1422 }, 1423 &now 1424 ) 1425 .await 1426 .expect("transfer"), 1427 TransferResult::WtidReuse 1428 ); 1429 // Many 1430 assert_eq!( 1431 make_transfer( 1432 &pool, 1433 &db::Transfer { 1434 request_uid: HashCode::rand(), 1435 wtid: ShortHashCode::rand(), 1436 ..req 1437 }, 1438 &later 1439 ) 1440 .await 1441 .expect("transfer"), 1442 TransferResult::Success { 1443 id: 2, 1444 initiated_at: later 1445 } 1446 ); 1447 1448 // Get 1449 assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some()); 1450 assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some()); 1451 assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none()); 1452 assert_eq!( 1453 db::transfer_page(&pool, &None, &Page::default()) 1454 .await 1455 .unwrap() 1456 .len(), 1457 2 1458 ); 1459 } 1460 1461 #[tokio::test] 1462 async fn bounce() { 1463 let (mut db, _) = setup().await; 1464 1465 let amount = amount("HUF:10"); 1466 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1467 let now = now_sql_stable_ts(); 1468 let date = Zoned::now().date(); 1469 1470 // Empty db 1471 assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty()); 1472 1473 // Insert 1474 assert_eq!( 1475 register_tx_in( 1476 &mut db, 1477 &TxIn { 1478 code: 13, 1479 amount, 1480 subject: "subject".into(), 1481 debtor: payto.clone(), 1482 value_date: date, 1483 status: TxStatus::Completed 1484 }, 1485 &None, 1486 &now 1487 ) 1488 .await 1489 .expect("register tx in"), 1490 AddIncomingResult::Success { 1491 new: true, 1492 pending: false, 1493 row_id: 1, 1494 valued_at: date 1495 } 1496 ); 1497 1498 // Bounce 1499 assert_eq!( 1500 register_bounce_tx_in( 1501 &mut db, 1502 &TxIn { 1503 code: 12, 1504 amount, 1505 subject: "subject".into(), 1506 debtor: payto.clone(), 1507 value_date: date, 1508 status: TxStatus::Completed 1509 }, 1510 "good reason", 1511 &now 1512 ) 1513 .await 1514 .expect("bounce"), 1515 BounceResult { 1516 tx_id: 2, 1517 tx_new: true, 1518 bounce_id: 1, 1519 bounce_new: true 1520 } 1521 ); 1522 // Idempotent 1523 assert_eq!( 1524 register_bounce_tx_in( 1525 &mut db, 1526 &TxIn { 1527 code: 12, 1528 amount, 1529 subject: "subject".into(), 1530 debtor: payto.clone(), 1531 value_date: date, 1532 status: TxStatus::Completed 1533 }, 1534 "good reason", 1535 &now 1536 ) 1537 .await 1538 .expect("bounce"), 1539 BounceResult { 1540 tx_id: 2, 1541 tx_new: false, 1542 bounce_id: 1, 1543 bounce_new: false 1544 } 1545 ); 1546 1547 // Bounce registered 1548 assert_eq!( 1549 register_bounce_tx_in( 1550 &mut db, 1551 &TxIn { 1552 code: 13, 1553 amount, 1554 subject: "subject".into(), 1555 debtor: payto.clone(), 1556 value_date: date, 1557 status: TxStatus::Completed 1558 }, 1559 "good reason", 1560 &now 1561 ) 1562 .await 1563 .expect("bounce"), 1564 BounceResult { 1565 tx_id: 1, 1566 tx_new: false, 1567 bounce_id: 2, 1568 bounce_new: true 1569 } 1570 ); 1571 // Idempotent registered 1572 assert_eq!( 1573 register_bounce_tx_in( 1574 &mut db, 1575 &TxIn { 1576 code: 13, 1577 amount, 1578 subject: "subject".into(), 1579 debtor: payto.clone(), 1580 value_date: date, 1581 status: TxStatus::Completed 1582 }, 1583 "good reason", 1584 &now 1585 ) 1586 .await 1587 .expect("bounce"), 1588 BounceResult { 1589 tx_id: 1, 1590 tx_new: false, 1591 bounce_id: 2, 1592 bounce_new: false 1593 } 1594 ); 1595 1596 // Batch 1597 assert_eq!( 1598 db::pending_batch(&mut db, &now).await.unwrap(), 1599 &[ 1600 Initiated { 1601 id: 1, 1602 amount, 1603 subject: "bounce: 12".into(), 1604 creditor: payto.clone() 1605 }, 1606 Initiated { 1607 id: 2, 1608 amount, 1609 subject: "bounce: 13".into(), 1610 creditor: payto 1611 } 1612 ] 1613 ); 1614 } 1615 1616 #[tokio::test] 1617 async fn status() { 1618 let (mut db, _) = setup().await; 1619 1620 // Unknown transfer 1621 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") 1622 .await 1623 .unwrap(); 1624 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1625 .await 1626 .unwrap(); 1627 } 1628 1629 #[tokio::test] 1630 async fn batch() { 1631 let (mut db, pool) = setup().await; 1632 let start = Timestamp::now(); 1633 let magnet_payto = 1634 magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1635 1636 // Empty db 1637 let pendings = db::pending_batch(&mut db, &start) 1638 .await 1639 .expect("pending_batch"); 1640 assert_eq!(pendings.len(), 0); 1641 1642 // Some transfers 1643 for i in 0..3 { 1644 make_transfer( 1645 &pool, 1646 &db::Transfer { 1647 request_uid: HashCode::rand(), 1648 amount: decimal(format!("{}", i + 1)), 1649 exchange_base_url: url("https://exchange.test.com/"), 1650 metadata: None, 1651 wtid: ShortHashCode::rand(), 1652 creditor: magnet_payto.clone(), 1653 }, 1654 &Timestamp::now(), 1655 ) 1656 .await 1657 .expect("transfer"); 1658 } 1659 let pendings = db::pending_batch(&mut db, &start) 1660 .await 1661 .expect("pending_batch"); 1662 assert_eq!(pendings.len(), 3); 1663 1664 // Max 100 txs in batch 1665 for i in 0..100 { 1666 make_transfer( 1667 &pool, 1668 &db::Transfer { 1669 request_uid: HashCode::rand(), 1670 amount: decimal(format!("{}", i + 1)), 1671 exchange_base_url: url("https://exchange.test.com/"), 1672 metadata: None, 1673 wtid: ShortHashCode::rand(), 1674 creditor: magnet_payto.clone(), 1675 }, 1676 &Timestamp::now(), 1677 ) 1678 .await 1679 .expect("transfer"); 1680 } 1681 let pendings = db::pending_batch(&mut db, &start) 1682 .await 1683 .expect("pending_batch"); 1684 assert_eq!(pendings.len(), 100); 1685 1686 // Skip uploaded 1687 for i in 0..=10 { 1688 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1689 .await 1690 .expect("status success"); 1691 } 1692 let pendings = db::pending_batch(&mut db, &start) 1693 .await 1694 .expect("pending_batch"); 1695 assert_eq!(pendings.len(), 93); 1696 1697 // Skip failed 1698 for i in 0..=10 { 1699 db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") 1700 .await 1701 .expect("status failure"); 1702 } 1703 let pendings = db::pending_batch(&mut db, &start) 1704 .await 1705 .expect("pending_batch"); 1706 assert_eq!(pendings.len(), 83); 1707 } 1708 }