db.rs (50719B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::{Timestamp, civil::Date, tz::TimeZone}; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api_common::{HashCode, ShortHashCode}, 30 api_params::{History, Page}, 31 api_revenue::RevenueIncomingBankTransaction, 32 api_transfer::{RegistrationRequest, Unregistration}, 33 api_wire::{ 34 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 35 TransferStatus, 36 }, 37 config::Config, 38 db::IncomingType, 39 types::{ 40 amount::{Amount, Decimal}, 41 payto::PaytoImpl as _, 42 }, 43 }; 44 use tokio::sync::watch::{Receiver, Sender}; 45 use url::Url; 46 47 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus}; 48 49 const SCHEMA: &str = "magnet_bank"; 50 51 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 52 let db = parse_db_cfg(cfg)?; 53 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 54 Ok(pool) 55 } 56 57 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 58 let db_cfg = parse_db_cfg(cfg)?; 59 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 60 let mut db = pool.acquire().await?; 61 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?; 62 Ok(pool) 63 } 64 65 pub async fn notification_listener( 66 pool: PgPool, 67 in_channel: Sender<i64>, 68 taler_in_channel: Sender<i64>, 69 out_channel: Sender<i64>, 70 taler_out_channel: Sender<i64>, 71 ) -> sqlx::Result<()> { 72 taler_api::notification::notification_listener!(&pool, 73 "tx_in" => (row_id: i64) { 74 in_channel.send_replace(row_id); 75 }, 76 "taler_in" => (row_id: i64) { 77 taler_in_channel.send_replace(row_id); 78 }, 79 "tx_out" => (row_id: i64) { 80 out_channel.send_replace(row_id); 81 }, 82 "taler_out" => (row_id: i64) { 83 taler_out_channel.send_replace(row_id); 84 } 85 ) 86 } 87 88 #[derive(Debug, Clone)] 89 pub struct TxIn { 90 pub code: u64, 91 pub amount: Amount, 92 pub subject: Box<str>, 93 pub debtor: FullHuPayto, 94 pub value_date: Date, 95 pub status: TxStatus, 96 } 97 98 impl Display for TxIn { 99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 100 let Self { 101 code, 102 amount, 103 subject, 104 debtor, 105 value_date, 106 status, 107 } = self; 108 write!( 109 f, 110 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 111 debtor.bban(), 112 debtor.name 113 ) 114 } 115 } 116 117 #[derive(Debug, Clone)] 118 pub struct TxOut { 119 pub code: u64, 120 pub amount: Amount, 121 pub subject: Box<str>, 122 pub creditor: FullHuPayto, 123 pub value_date: Date, 124 pub status: TxStatus, 125 } 126 127 impl Display for TxOut { 128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 129 let Self { 130 code, 131 amount, 132 subject, 133 creditor, 134 value_date, 135 status, 136 } = self; 137 write!( 138 f, 139 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 140 creditor.bban(), 141 &creditor.name 142 ) 143 } 144 } 145 146 #[derive(Debug, PartialEq, Eq)] 147 pub struct Initiated { 148 pub id: u64, 149 pub amount: Amount, 150 pub subject: Box<str>, 151 pub creditor: FullHuPayto, 152 } 153 154 impl Display for Initiated { 155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 156 let Self { 157 id, 158 amount, 159 subject, 160 creditor, 161 } = self; 162 write!( 163 f, 164 "{id} {amount} ({} {}) '{subject}'", 165 creditor.bban(), 166 &creditor.name 167 ) 168 } 169 } 170 171 #[derive(Debug, Clone)] 172 pub struct TxInAdmin { 173 pub amount: Amount, 174 pub subject: String, 175 pub debtor: FullHuPayto, 176 pub metadata: IncomingSubject, 177 } 178 179 /// Lock the database for worker execution 180 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 181 sqlx::query("SELECT pg_try_advisory_lock(42)") 182 .try_map(|r: PgRow| r.try_get(0)) 183 .fetch_one(e) 184 .await 185 } 186 187 #[derive(Debug, PartialEq, Eq)] 188 pub enum AddIncomingResult { 189 Success { 190 new: bool, 191 pending: bool, 192 row_id: u64, 193 valued_at: Date, 194 }, 195 ReservePubReuse, 196 UnknownMapping, 197 MappingReuse, 198 } 199 200 pub async fn register_tx_in_admin( 201 db: &PgPool, 202 tx: &TxInAdmin, 203 now: &Timestamp, 204 ) -> sqlx::Result<AddIncomingResult> { 205 serialized!( 206 sqlx::query( 207 " 208 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 209 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) 210 ", 211 ) 212 .bind(&tx.amount) 213 .bind(&tx.subject) 214 .bind(tx.debtor.iban()) 215 .bind(&tx.debtor.name) 216 .bind_date(&now.to_zoned(TimeZone::UTC).date()) 217 .bind(tx.metadata.ty()) 218 .bind(tx.metadata.key()) 219 .try_map(|r: PgRow| { 220 Ok(if r.try_get_flag(0)? { 221 AddIncomingResult::ReservePubReuse 222 } else if r.try_get_flag(1)? { 223 AddIncomingResult::MappingReuse 224 } else if r.try_get_flag(2)? { 225 AddIncomingResult::UnknownMapping 226 } else { 227 AddIncomingResult::Success { 228 row_id: r.try_get_u64(3)?, 229 valued_at: r.try_get_date(4)?, 230 new: r.try_get(5)?, 231 pending: r.try_get(6)? 232 } 233 }) 234 }) 235 .fetch_one(db) 236 ) 237 } 238 239 pub async fn register_tx_in( 240 db: &mut PgConnection, 241 tx: &TxIn, 242 subject: &Option<IncomingSubject>, 243 now: &Timestamp, 244 ) -> sqlx::Result<AddIncomingResult> { 245 serialized!( 246 sqlx::query( 247 " 248 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 249 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) 250 ", 251 ) 252 .bind(tx.code as i64) 253 .bind(&tx.amount) 254 .bind(&tx.subject) 255 .bind(tx.debtor.iban()) 256 .bind(&tx.debtor.name) 257 .bind_date(&tx.value_date) 258 .bind(subject.as_ref().map(|it| it.ty())) 259 .bind(subject.as_ref().map(|it| it.key())) 260 .bind_timestamp(now) 261 .try_map(|r: PgRow| { 262 Ok(if r.try_get_flag(0)? { 263 AddIncomingResult::ReservePubReuse 264 } else if r.try_get_flag(1)? { 265 AddIncomingResult::MappingReuse 266 } else if r.try_get_flag(2)? { 267 AddIncomingResult::UnknownMapping 268 } else { 269 AddIncomingResult::Success { 270 row_id: r.try_get_u64(3)?, 271 valued_at: r.try_get_date(4)?, 272 new: r.try_get(5)?, 273 pending: r.try_get(6)? 274 } 275 }) 276 }) 277 .fetch_one(&mut *db) 278 ) 279 } 280 281 #[derive(Debug)] 282 pub enum TxOutKind { 283 Simple, 284 Bounce(u32), 285 Talerable(OutgoingSubject), 286 } 287 288 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 289 #[allow(non_camel_case_types)] 290 #[sqlx(type_name = "register_result")] 291 pub enum RegisterResult { 292 /// Already registered 293 idempotent, 294 /// Initiated transaction 295 known, 296 /// Recovered unknown outgoing transaction 297 recovered, 298 } 299 300 #[derive(Debug, PartialEq, Eq)] 301 pub struct AddOutgoingResult { 302 pub result: RegisterResult, 303 pub row_id: u64, 304 } 305 306 pub async fn register_tx_out( 307 db: &mut PgConnection, 308 tx: &TxOut, 309 kind: &TxOutKind, 310 now: &Timestamp, 311 ) -> sqlx::Result<AddOutgoingResult> { 312 serialized!({ 313 let query = sqlx::query( 314 " 315 SELECT out_result, out_tx_row_id 316 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 317 ", 318 ) 319 .bind(tx.code as i64) 320 .bind(&tx.amount) 321 .bind(&tx.subject) 322 .bind(tx.creditor.iban()) 323 .bind(&tx.creditor.name) 324 .bind_date(&tx.value_date); 325 let query = match kind { 326 TxOutKind::Simple => query 327 .bind(None::<&[u8]>) 328 .bind(None::<&str>) 329 .bind(None::<&str>) 330 .bind(None::<i64>), 331 TxOutKind::Bounce(bounced) => query 332 .bind(None::<&[u8]>) 333 .bind(None::<&str>) 334 .bind(None::<&str>) 335 .bind(*bounced as i64), 336 TxOutKind::Talerable(subject) => query 337 .bind(&subject.wtid) 338 .bind(subject.exchange_base_url.as_str()) 339 .bind(&subject.metadata) 340 .bind(None::<i64>), 341 }; 342 query 343 .bind_timestamp(now) 344 .try_map(|r: PgRow| { 345 Ok(AddOutgoingResult { 346 result: r.try_get(0)?, 347 row_id: r.try_get_u64(1)?, 348 }) 349 }) 350 .fetch_one(&mut *db) 351 }) 352 } 353 354 #[derive(Debug, PartialEq, Eq)] 355 pub struct OutFailureResult { 356 pub initiated_id: Option<u64>, 357 pub new: bool, 358 } 359 360 pub async fn register_tx_out_failure( 361 db: &mut PgConnection, 362 code: u64, 363 bounced: Option<u32>, 364 now: &Timestamp, 365 ) -> sqlx::Result<OutFailureResult> { 366 serialized!( 367 sqlx::query( 368 " 369 SELECT out_new, out_initiated_id 370 FROM register_tx_out_failure($1, $2, $3) 371 ", 372 ) 373 .bind(code as i64) 374 .bind(bounced.map(|i| i as i32)) 375 .bind_timestamp(now) 376 .try_map(|r: PgRow| { 377 Ok(OutFailureResult { 378 new: r.try_get(0)?, 379 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), 380 }) 381 }) 382 .fetch_one(&mut *db) 383 ) 384 } 385 386 #[derive(Debug, PartialEq, Eq)] 387 pub enum TransferResult { 388 Success { id: u64, initiated_at: Timestamp }, 389 RequestUidReuse, 390 WtidReuse, 391 } 392 393 #[derive(Debug, Clone)] 394 pub struct Transfer { 395 pub request_uid: HashCode, 396 pub amount: Decimal, 397 pub exchange_base_url: Url, 398 pub metadata: Option<CompactString>, 399 pub wtid: ShortHashCode, 400 pub creditor: FullHuPayto, 401 } 402 403 pub async fn make_transfer( 404 db: &PgPool, 405 tx: &Transfer, 406 now: &Timestamp, 407 ) -> sqlx::Result<TransferResult> { 408 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 409 serialized!( 410 sqlx::query( 411 " 412 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 413 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 414 ", 415 ) 416 .bind(&tx.request_uid) 417 .bind(&tx.wtid) 418 .bind(&subject) 419 .bind(tx.amount) 420 .bind(tx.exchange_base_url.as_str()) 421 .bind(&tx.metadata) 422 .bind(tx.creditor.iban()) 423 .bind(&tx.creditor.name) 424 .bind_timestamp(now) 425 .try_map(|r: PgRow| { 426 Ok(if r.try_get_flag(0)? { 427 TransferResult::RequestUidReuse 428 } else if r.try_get_flag(1)? { 429 TransferResult::WtidReuse 430 } else { 431 TransferResult::Success { 432 id: r.try_get_u64(2)?, 433 initiated_at: r.try_get_timestamp(3)?, 434 } 435 }) 436 }) 437 .fetch_one(db) 438 ) 439 } 440 441 #[derive(Debug, PartialEq, Eq)] 442 pub struct BounceResult { 443 pub tx_id: u64, 444 pub tx_new: bool, 445 pub bounce_id: u64, 446 pub bounce_new: bool, 447 } 448 449 pub async fn register_bounce_tx_in( 450 db: &mut PgConnection, 451 tx: &TxIn, 452 reason: &str, 453 now: &Timestamp, 454 ) -> sqlx::Result<BounceResult> { 455 serialized!( 456 sqlx::query( 457 " 458 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new 459 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8) 460 ", 461 ) 462 .bind(tx.code as i64) 463 .bind(&tx.amount) 464 .bind(&tx.subject) 465 .bind(tx.debtor.iban()) 466 .bind(&tx.debtor.name) 467 .bind_date(&tx.value_date) 468 .bind(reason) 469 .bind_timestamp(now) 470 .try_map(|r: PgRow| { 471 Ok(BounceResult { 472 tx_id: r.try_get_u64(0)?, 473 tx_new: r.try_get(1)?, 474 bounce_id: r.try_get_u64(2)?, 475 bounce_new: r.try_get(3)?, 476 }) 477 }) 478 .fetch_one(&mut *db) 479 ) 480 } 481 482 pub async fn transfer_page( 483 db: &PgPool, 484 status: &Option<TransferState>, 485 params: &Page, 486 ) -> sqlx::Result<Vec<TransferListStatus>> { 487 page( 488 db, 489 "initiated_id", 490 params, 491 || { 492 let mut builder = QueryBuilder::new( 493 " 494 SELECT 495 initiated_id, 496 status, 497 amount, 498 credit_account, 499 credit_name, 500 initiated_at 501 FROM transfer 502 JOIN initiated USING (initiated_id) 503 WHERE 504 ", 505 ); 506 if let Some(status) = status { 507 builder.push(" status = ").push_bind(status).push(" AND "); 508 } 509 builder 510 }, 511 |r: PgRow| { 512 Ok(TransferListStatus { 513 row_id: r.try_get_safeu64(0)?, 514 status: r.try_get(1)?, 515 amount: r.try_get_amount(2, &CURRENCY)?, 516 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 517 timestamp: r.try_get_timestamp(5)?.into(), 518 }) 519 }, 520 ) 521 .await 522 } 523 524 pub async fn outgoing_history( 525 db: &PgPool, 526 params: &History, 527 listen: impl FnOnce() -> Receiver<i64>, 528 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 529 history( 530 db, 531 "tx_out_id", 532 params, 533 listen, 534 || { 535 QueryBuilder::new( 536 " 537 SELECT 538 tx_out_id, 539 amount, 540 credit_account, 541 credit_name, 542 valued_at, 543 exchange_base_url, 544 metadata, 545 wtid 546 FROM taler_out 547 JOIN tx_out USING (tx_out_id) 548 WHERE 549 ", 550 ) 551 }, 552 |r: PgRow| { 553 Ok(OutgoingBankTransaction { 554 row_id: r.try_get_safeu64(0)?, 555 amount: r.try_get_amount(1, &CURRENCY)?, 556 debit_fee: None, 557 credit_account: r.try_get_iban(2)?.as_full_payto(r.try_get(3)?), 558 date: r.try_get_timestamp(4)?.into(), 559 exchange_base_url: r.try_get_url(5)?, 560 metadata: r.try_get(6)?, 561 wtid: r.try_get(7)?, 562 }) 563 }, 564 ) 565 .await 566 } 567 568 pub async fn incoming_history( 569 db: &PgPool, 570 params: &History, 571 listen: impl FnOnce() -> Receiver<i64>, 572 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 573 history( 574 db, 575 "tx_in_id", 576 params, 577 listen, 578 || { 579 QueryBuilder::new( 580 " 581 SELECT 582 type, 583 tx_in_id, 584 amount, 585 debit_account, 586 debit_name, 587 valued_at, 588 metadata, 589 authorization_pub, 590 authorization_sig 591 FROM taler_in 592 JOIN tx_in USING (tx_in_id) 593 WHERE 594 ", 595 ) 596 }, 597 |r: PgRow| { 598 Ok(match r.try_get(0)? { 599 IncomingType::reserve => IncomingBankTransaction::Reserve { 600 row_id: r.try_get_safeu64(1)?, 601 amount: r.try_get_amount(2, &CURRENCY)?, 602 credit_fee: None, 603 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 604 date: r.try_get_timestamp(5)?.into(), 605 reserve_pub: r.try_get(6)?, 606 authorization_pub: r.try_get(7)?, 607 authorization_sig: r.try_get(8)?, 608 }, 609 IncomingType::kyc => IncomingBankTransaction::Kyc { 610 row_id: r.try_get_safeu64(1)?, 611 amount: r.try_get_amount(2, &CURRENCY)?, 612 credit_fee: None, 613 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 614 date: r.try_get_timestamp(5)?.into(), 615 account_pub: r.try_get(6)?, 616 authorization_pub: r.try_get(7)?, 617 authorization_sig: r.try_get(8)?, 618 }, 619 IncomingType::map => unimplemented!("MAP are never listed in the history"), 620 }) 621 }, 622 ) 623 .await 624 } 625 626 pub async fn revenue_history( 627 db: &PgPool, 628 params: &History, 629 listen: impl FnOnce() -> Receiver<i64>, 630 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 631 history( 632 db, 633 "tx_in_id", 634 params, 635 listen, 636 || { 637 QueryBuilder::new( 638 " 639 SELECT 640 tx_in_id, 641 valued_at, 642 amount, 643 debit_account, 644 debit_name, 645 subject 646 FROM tx_in 647 WHERE 648 ", 649 ) 650 }, 651 |r: PgRow| { 652 Ok(RevenueIncomingBankTransaction { 653 row_id: r.try_get_safeu64(0)?, 654 date: r.try_get_timestamp(1)?.into(), 655 amount: r.try_get_amount(2, &CURRENCY)?, 656 credit_fee: None, 657 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 658 subject: r.try_get(5)?, 659 }) 660 }, 661 ) 662 .await 663 } 664 665 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> { 666 serialized!( 667 sqlx::query( 668 " 669 SELECT 670 status, 671 status_msg, 672 amount, 673 exchange_base_url, 674 metadata, 675 wtid, 676 credit_account, 677 credit_name, 678 initiated_at 679 FROM transfer 680 JOIN initiated USING (initiated_id) 681 WHERE initiated_id = $1 682 ", 683 ) 684 .bind(id as i64) 685 .try_map(|r: PgRow| { 686 Ok(TransferStatus { 687 status: r.try_get(0)?, 688 status_msg: r.try_get(1)?, 689 amount: r.try_get_amount(2, &CURRENCY)?, 690 origin_exchange_url: r.try_get(3)?, 691 metadata: r.try_get(4)?, 692 wtid: r.try_get(5)?, 693 credit_account: r.try_get_iban(6)?.as_full_payto(r.try_get(7)?), 694 timestamp: r.try_get_timestamp(8)?.into(), 695 }) 696 }) 697 .fetch_optional(db) 698 ) 699 } 700 701 /** Get a batch of pending initiated transactions not attempted since [start] */ 702 pub async fn pending_batch( 703 db: &mut PgConnection, 704 start: &Timestamp, 705 ) -> sqlx::Result<Vec<Initiated>> { 706 serialized!( 707 sqlx::query( 708 " 709 SELECT initiated_id, amount, subject, credit_account, credit_name 710 FROM initiated 711 WHERE magnet_code IS NULL 712 AND status='pending' 713 AND (last_submitted IS NULL OR last_submitted < $1) 714 LIMIT 100 715 ", 716 ) 717 .bind_timestamp(start) 718 .try_map(|r: PgRow| { 719 Ok(Initiated { 720 id: r.try_get_u64(0)?, 721 amount: r.try_get_amount(1, &CURRENCY)?, 722 subject: r.try_get(2)?, 723 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 724 }) 725 }) 726 .fetch_all(&mut *db) 727 ) 728 } 729 730 /** Get an initiated transaction matching the given magnet [code] */ 731 pub async fn initiated_by_code( 732 db: &mut PgConnection, 733 code: u64, 734 ) -> sqlx::Result<Option<Initiated>> { 735 serialized!( 736 sqlx::query( 737 " 738 SELECT initiated_id, amount, subject, credit_account, credit_name 739 FROM initiated 740 WHERE magnet_code IS $1 741 ", 742 ) 743 .bind(code as i64) 744 .try_map(|r: PgRow| { 745 Ok(Initiated { 746 id: r.try_get_u64(0)?, 747 amount: r.try_get_amount(1, &CURRENCY)?, 748 subject: r.try_get(2)?, 749 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 750 }) 751 }) 752 .fetch_optional(&mut *db) 753 ) 754 } 755 756 /** Update status of a successful submitted initiated transaction */ 757 pub async fn initiated_submit_success( 758 db: &mut PgConnection, 759 id: u64, 760 timestamp: &Timestamp, 761 magnet_code: u64, 762 ) -> sqlx::Result<()> { 763 serialized!( 764 sqlx::query( 765 " 766 UPDATE initiated 767 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 768 WHERE initiated_id=$3 769 " 770 ).bind_timestamp(timestamp) 771 .bind(magnet_code as i64) 772 .bind(id as i64) 773 .execute(&mut *db) 774 )?; 775 Ok(()) 776 } 777 778 /** Update status of a permanently failed initiated transaction */ 779 pub async fn initiated_submit_permanent_failure( 780 db: &mut PgConnection, 781 id: u64, 782 timestamp: &Timestamp, 783 msg: &str, 784 ) -> sqlx::Result<()> { 785 serialized!( 786 sqlx::query( 787 " 788 UPDATE initiated 789 SET status='permanent_failure', status_msg=$2 790 WHERE initiated_id=$3 791 ", 792 ) 793 .bind_timestamp(timestamp) 794 .bind(msg) 795 .bind(id as i64) 796 .execute(&mut *db) 797 )?; 798 Ok(()) 799 } 800 801 /** Check if an initiated transaction exist for a magnet code */ 802 pub async fn initiated_exists_for_code( 803 db: &mut PgConnection, 804 code: u64, 805 ) -> sqlx::Result<Option<u64>> { 806 serialized!( 807 sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") 808 .bind(code as i64) 809 .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) 810 .fetch_optional(&mut *db) 811 ) 812 } 813 814 /** Get JSON value from KV table */ 815 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 816 db: &mut PgConnection, 817 key: &str, 818 ) -> sqlx::Result<Option<T>> { 819 serialized!( 820 sqlx::query("SELECT value FROM kv WHERE key=$1") 821 .bind(key) 822 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 823 .fetch_optional(&mut *db) 824 ) 825 } 826 827 /** Set JSON value in KV table */ 828 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 829 serialized!( 830 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 831 .bind(key) 832 .bind(sqlx::types::Json(value)) 833 .execute(&mut *db) 834 )?; 835 Ok(()) 836 } 837 838 pub enum RegistrationResult { 839 Success, 840 ReservePubReuse, 841 } 842 843 pub async fn transfer_register( 844 db: &PgPool, 845 req: &RegistrationRequest, 846 ) -> sqlx::Result<RegistrationResult> { 847 let ty: IncomingType = req.r#type.into(); 848 serialized!( 849 sqlx::query( 850 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 851 ) 852 .bind(ty) 853 .bind(&req.account_pub) 854 .bind(&req.authorization_pub) 855 .bind(&req.authorization_sig) 856 .bind(req.recurrent) 857 .bind_timestamp(&Timestamp::now()) 858 .try_map(|r: PgRow| { 859 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 860 RegistrationResult::ReservePubReuse 861 } else { 862 RegistrationResult::Success 863 }) 864 }) 865 .fetch_one(db) 866 ) 867 } 868 869 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 870 serialized!( 871 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") 872 .bind(&req.authorization_pub) 873 .bind_timestamp(&Timestamp::now()) 874 .try_map(|r: PgRow| r.try_get_flag("out_found")) 875 .fetch_one(db) 876 ) 877 } 878 879 #[cfg(test)] 880 mod test { 881 use jiff::{Span, Timestamp, Zoned}; 882 use serde_json::json; 883 use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; 884 use taler_api::{ 885 db::TypeHelper, 886 notification::dummy_listen, 887 subject::{IncomingSubject, OutgoingSubject}, 888 }; 889 use taler_common::{ 890 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 891 api_params::{History, Page}, 892 types::{ 893 amount::{amount, decimal}, 894 url, 895 utils::now_sql_stable_ts, 896 }, 897 }; 898 899 use crate::{ 900 constants::CONFIG_SOURCE, 901 db::{ 902 self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult, 903 TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, 904 register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out, 905 }, 906 magnet_api::types::TxStatus, 907 magnet_payto, 908 }; 909 910 use super::TxInAdmin; 911 912 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 913 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 914 } 915 916 #[tokio::test] 917 async fn kv() { 918 let (mut db, _) = setup().await; 919 920 let value = json!({ 921 "name": "Mr Smith", 922 "no way": 32 923 }); 924 925 assert_eq!( 926 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 927 None 928 ); 929 kv_set(&mut db, "value", &value).await.unwrap(); 930 kv_set(&mut db, "value", &value).await.unwrap(); 931 assert_eq!( 932 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 933 Some(value) 934 ); 935 } 936 937 #[tokio::test] 938 async fn tx_in() { 939 let (mut db, pool) = setup().await; 940 941 let mut routine = async |first: &Option<IncomingSubject>, 942 second: &Option<IncomingSubject>| { 943 let (id, code) = 944 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") 945 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 946 .fetch_one(&mut *db) 947 .await 948 .unwrap(); 949 let now = now_sql_stable_ts(); 950 let date = Zoned::now().date(); 951 let later = date.tomorrow().unwrap(); 952 let tx = TxIn { 953 code, 954 amount: amount("EUR:10"), 955 subject: "subject".into(), 956 debtor: magnet_payto( 957 "payto://iban/HU30162000031000163100000000?receiver-name=name", 958 ), 959 value_date: date, 960 status: TxStatus::Completed, 961 }; 962 // Insert 963 assert_eq!( 964 register_tx_in(&mut db, &tx, first, &now) 965 .await 966 .expect("register tx in"), 967 AddIncomingResult::Success { 968 new: true, 969 pending: false, 970 row_id: id, 971 valued_at: date 972 } 973 ); 974 // Idempotent 975 assert_eq!( 976 register_tx_in( 977 &mut db, 978 &TxIn { 979 value_date: later, 980 ..tx.clone() 981 }, 982 first, 983 &now 984 ) 985 .await 986 .expect("register tx in"), 987 AddIncomingResult::Success { 988 new: false, 989 pending: false, 990 row_id: id, 991 valued_at: date 992 } 993 ); 994 // Many 995 assert_eq!( 996 register_tx_in( 997 &mut db, 998 &TxIn { 999 code: code + 1, 1000 value_date: later, 1001 ..tx 1002 }, 1003 second, 1004 &now 1005 ) 1006 .await 1007 .expect("register tx in"), 1008 AddIncomingResult::Success { 1009 new: true, 1010 pending: false, 1011 row_id: id + 1, 1012 valued_at: later 1013 } 1014 ); 1015 }; 1016 1017 // Empty db 1018 assert_eq!( 1019 db::revenue_history(&pool, &History::default(), dummy_listen) 1020 .await 1021 .unwrap(), 1022 Vec::new() 1023 ); 1024 assert_eq!( 1025 db::incoming_history(&pool, &History::default(), dummy_listen) 1026 .await 1027 .unwrap(), 1028 Vec::new() 1029 ); 1030 1031 // Regular transaction 1032 routine(&None, &None).await; 1033 1034 // Reserve transaction 1035 routine( 1036 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1037 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1038 ) 1039 .await; 1040 1041 // Kyc transaction 1042 routine( 1043 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1044 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1045 ) 1046 .await; 1047 1048 // History 1049 assert_eq!( 1050 db::revenue_history(&pool, &History::default(), dummy_listen) 1051 .await 1052 .unwrap() 1053 .len(), 1054 6 1055 ); 1056 assert_eq!( 1057 db::incoming_history(&pool, &History::default(), dummy_listen) 1058 .await 1059 .unwrap() 1060 .len(), 1061 4 1062 ); 1063 } 1064 1065 #[tokio::test] 1066 async fn tx_in_admin() { 1067 let (_, pool) = setup().await; 1068 1069 // Empty db 1070 assert_eq!( 1071 db::incoming_history(&pool, &History::default(), dummy_listen) 1072 .await 1073 .unwrap(), 1074 Vec::new() 1075 ); 1076 1077 let now = now_sql_stable_ts(); 1078 let later = now + Span::new().hours(2); 1079 let date = Zoned::now().date(); 1080 let tx = TxInAdmin { 1081 amount: amount("EUR:10"), 1082 subject: "subject".to_owned(), 1083 debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1084 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1085 }; 1086 // Insert 1087 assert_eq!( 1088 register_tx_in_admin(&pool, &tx, &now) 1089 .await 1090 .expect("register tx in"), 1091 AddIncomingResult::Success { 1092 new: true, 1093 pending: false, 1094 row_id: 1, 1095 valued_at: date 1096 } 1097 ); 1098 // Idempotent 1099 assert_eq!( 1100 register_tx_in_admin(&pool, &tx, &later) 1101 .await 1102 .expect("register tx in"), 1103 AddIncomingResult::Success { 1104 new: false, 1105 pending: false, 1106 row_id: 1, 1107 valued_at: date 1108 } 1109 ); 1110 // Many 1111 assert_eq!( 1112 register_tx_in_admin( 1113 &pool, 1114 &TxInAdmin { 1115 subject: "Other".to_owned(), 1116 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1117 ..tx.clone() 1118 }, 1119 &later 1120 ) 1121 .await 1122 .expect("register tx in"), 1123 AddIncomingResult::Success { 1124 new: true, 1125 pending: false, 1126 row_id: 2, 1127 valued_at: date 1128 } 1129 ); 1130 1131 // History 1132 assert_eq!( 1133 db::incoming_history(&pool, &History::default(), dummy_listen) 1134 .await 1135 .unwrap() 1136 .len(), 1137 2 1138 ); 1139 } 1140 1141 #[tokio::test] 1142 async fn tx_out() { 1143 let (mut db, pool) = setup().await; 1144 1145 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1146 let (id, code) = 1147 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") 1148 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 1149 .fetch_one(&mut *db) 1150 .await 1151 .unwrap(); 1152 let now = now_sql_stable_ts(); 1153 let date = Zoned::now().date(); 1154 let later = date.tomorrow().unwrap(); 1155 let tx = TxOut { 1156 code, 1157 amount: amount("HUF:10"), 1158 subject: "subject".into(), 1159 creditor: magnet_payto( 1160 "payto://iban/HU30162000031000163100000000?receiver-name=name", 1161 ), 1162 value_date: date, 1163 status: TxStatus::Completed, 1164 }; 1165 assert!(matches!( 1166 make_transfer( 1167 &pool, 1168 &db::Transfer { 1169 request_uid: HashCode::rand(), 1170 amount: decimal("10"), 1171 exchange_base_url: url("https://exchange.test.com/"), 1172 metadata: None, 1173 wtid: ShortHashCode::rand(), 1174 creditor: tx.creditor.clone() 1175 }, 1176 &now 1177 ) 1178 .await 1179 .unwrap(), 1180 TransferResult::Success { .. } 1181 )); 1182 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code) 1183 .await 1184 .expect("status success"); 1185 1186 // Insert 1187 assert_eq!( 1188 register_tx_out(&mut db, &tx, first, &now) 1189 .await 1190 .expect("register tx out"), 1191 AddOutgoingResult { 1192 result: db::RegisterResult::known, 1193 row_id: id, 1194 } 1195 ); 1196 // Idempotent 1197 assert_eq!( 1198 register_tx_out( 1199 &mut db, 1200 &TxOut { 1201 value_date: later, 1202 ..tx.clone() 1203 }, 1204 first, 1205 &now 1206 ) 1207 .await 1208 .expect("register tx out"), 1209 AddOutgoingResult { 1210 result: db::RegisterResult::idempotent, 1211 row_id: id, 1212 } 1213 ); 1214 // Recovered 1215 assert_eq!( 1216 register_tx_out( 1217 &mut db, 1218 &TxOut { 1219 code: code + 1, 1220 value_date: later, 1221 ..tx.clone() 1222 }, 1223 second, 1224 &now 1225 ) 1226 .await 1227 .expect("register tx out"), 1228 AddOutgoingResult { 1229 result: db::RegisterResult::recovered, 1230 row_id: id + 1, 1231 } 1232 ); 1233 }; 1234 1235 // Empty db 1236 assert_eq!( 1237 db::outgoing_history(&pool, &History::default(), dummy_listen) 1238 .await 1239 .unwrap(), 1240 Vec::new() 1241 ); 1242 1243 // Regular transaction 1244 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1245 1246 // Talerable transaction 1247 routine( 1248 &TxOutKind::Talerable(OutgoingSubject::rand()), 1249 &TxOutKind::Talerable(OutgoingSubject::rand()), 1250 ) 1251 .await; 1252 1253 // Bounced transaction 1254 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1255 1256 // History 1257 assert_eq!( 1258 db::outgoing_history(&pool, &History::default(), dummy_listen) 1259 .await 1260 .unwrap() 1261 .len(), 1262 2 1263 ); 1264 } 1265 1266 #[tokio::test] 1267 async fn tx_out_failure() { 1268 let (mut db, pool) = setup().await; 1269 1270 let now = now_sql_stable_ts(); 1271 1272 // Unknown 1273 assert_eq!( 1274 db::register_tx_out_failure(&mut db, 42, None, &now) 1275 .await 1276 .unwrap(), 1277 OutFailureResult { 1278 initiated_id: None, 1279 new: false 1280 } 1281 ); 1282 assert_eq!( 1283 db::register_tx_out_failure(&mut db, 42, Some(12), &now) 1284 .await 1285 .unwrap(), 1286 OutFailureResult { 1287 initiated_id: None, 1288 new: false 1289 } 1290 ); 1291 1292 // Initiated 1293 let req = db::Transfer { 1294 request_uid: HashCode::rand(), 1295 amount: decimal("10"), 1296 exchange_base_url: url("https://exchange.test.com/"), 1297 metadata: None, 1298 wtid: ShortHashCode::rand(), 1299 creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1300 }; 1301 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1302 assert_eq!( 1303 make_transfer(&pool, &req, &now).await.unwrap(), 1304 TransferResult::Success { 1305 id: 1, 1306 initiated_at: now 1307 } 1308 ); 1309 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) 1310 .await 1311 .expect("status success"); 1312 assert_eq!( 1313 db::register_tx_out_failure(&mut db, 34, None, &now) 1314 .await 1315 .unwrap(), 1316 OutFailureResult { 1317 initiated_id: Some(1), 1318 new: true 1319 } 1320 ); 1321 assert_eq!( 1322 db::register_tx_out_failure(&mut db, 34, None, &now) 1323 .await 1324 .unwrap(), 1325 OutFailureResult { 1326 initiated_id: Some(1), 1327 new: false 1328 } 1329 ); 1330 1331 // Recovered bounce 1332 let tx = TxIn { 1333 code: 12, 1334 amount: amount("HUF:11"), 1335 subject: "malformed transaction".into(), 1336 debtor: payto, 1337 value_date: Zoned::now().date(), 1338 status: TxStatus::Completed, 1339 }; 1340 assert_eq!( 1341 db::register_bounce_tx_in(&mut db, &tx, "no reason", &now) 1342 .await 1343 .unwrap(), 1344 BounceResult { 1345 tx_id: 1, 1346 tx_new: true, 1347 bounce_id: 2, 1348 bounce_new: true 1349 } 1350 ); 1351 assert_eq!( 1352 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1353 .await 1354 .unwrap(), 1355 OutFailureResult { 1356 initiated_id: Some(2), 1357 new: true 1358 } 1359 ); 1360 assert_eq!( 1361 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1362 .await 1363 .unwrap(), 1364 OutFailureResult { 1365 initiated_id: Some(2), 1366 new: false 1367 } 1368 ); 1369 } 1370 1371 #[tokio::test] 1372 async fn transfer() { 1373 let (_, pool) = setup().await; 1374 1375 // Empty db 1376 assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None); 1377 assert_eq!( 1378 db::transfer_page(&pool, &None, &Page::default()) 1379 .await 1380 .unwrap(), 1381 Vec::new() 1382 ); 1383 1384 let req = db::Transfer { 1385 request_uid: HashCode::rand(), 1386 amount: decimal("10"), 1387 exchange_base_url: url("https://exchange.test.com/"), 1388 metadata: None, 1389 wtid: ShortHashCode::rand(), 1390 creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1391 }; 1392 let now = now_sql_stable_ts(); 1393 let later = now + Span::new().hours(2); 1394 // Insert 1395 assert_eq!( 1396 make_transfer(&pool, &req, &now).await.expect("transfer"), 1397 TransferResult::Success { 1398 id: 1, 1399 initiated_at: now 1400 } 1401 ); 1402 // Idempotent 1403 assert_eq!( 1404 make_transfer(&pool, &req, &later).await.expect("transfer"), 1405 TransferResult::Success { 1406 id: 1, 1407 initiated_at: now 1408 } 1409 ); 1410 // Request UID reuse 1411 assert_eq!( 1412 make_transfer( 1413 &pool, 1414 &db::Transfer { 1415 wtid: ShortHashCode::rand(), 1416 ..req.clone() 1417 }, 1418 &now 1419 ) 1420 .await 1421 .expect("transfer"), 1422 TransferResult::RequestUidReuse 1423 ); 1424 // wtid reuse 1425 assert_eq!( 1426 make_transfer( 1427 &pool, 1428 &db::Transfer { 1429 request_uid: HashCode::rand(), 1430 ..req.clone() 1431 }, 1432 &now 1433 ) 1434 .await 1435 .expect("transfer"), 1436 TransferResult::WtidReuse 1437 ); 1438 // Many 1439 assert_eq!( 1440 make_transfer( 1441 &pool, 1442 &db::Transfer { 1443 request_uid: HashCode::rand(), 1444 wtid: ShortHashCode::rand(), 1445 ..req 1446 }, 1447 &later 1448 ) 1449 .await 1450 .expect("transfer"), 1451 TransferResult::Success { 1452 id: 2, 1453 initiated_at: later 1454 } 1455 ); 1456 1457 // Get 1458 assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some()); 1459 assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some()); 1460 assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none()); 1461 assert_eq!( 1462 db::transfer_page(&pool, &None, &Page::default()) 1463 .await 1464 .unwrap() 1465 .len(), 1466 2 1467 ); 1468 } 1469 1470 #[tokio::test] 1471 async fn bounce() { 1472 let (mut db, _) = setup().await; 1473 1474 let amount = amount("HUF:10"); 1475 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1476 let now = now_sql_stable_ts(); 1477 let date = Zoned::now().date(); 1478 1479 // Empty db 1480 assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty()); 1481 1482 // Insert 1483 assert_eq!( 1484 register_tx_in( 1485 &mut db, 1486 &TxIn { 1487 code: 13, 1488 amount: amount.clone(), 1489 subject: "subject".into(), 1490 debtor: payto.clone(), 1491 value_date: date, 1492 status: TxStatus::Completed 1493 }, 1494 &None, 1495 &now 1496 ) 1497 .await 1498 .expect("register tx in"), 1499 AddIncomingResult::Success { 1500 new: true, 1501 pending: false, 1502 row_id: 1, 1503 valued_at: date 1504 } 1505 ); 1506 1507 // Bounce 1508 assert_eq!( 1509 register_bounce_tx_in( 1510 &mut db, 1511 &TxIn { 1512 code: 12, 1513 amount: amount.clone(), 1514 subject: "subject".into(), 1515 debtor: payto.clone(), 1516 value_date: date, 1517 status: TxStatus::Completed 1518 }, 1519 "good reason", 1520 &now 1521 ) 1522 .await 1523 .expect("bounce"), 1524 BounceResult { 1525 tx_id: 2, 1526 tx_new: true, 1527 bounce_id: 1, 1528 bounce_new: true 1529 } 1530 ); 1531 // Idempotent 1532 assert_eq!( 1533 register_bounce_tx_in( 1534 &mut db, 1535 &TxIn { 1536 code: 12, 1537 amount: amount.clone(), 1538 subject: "subject".into(), 1539 debtor: payto.clone(), 1540 value_date: date, 1541 status: TxStatus::Completed 1542 }, 1543 "good reason", 1544 &now 1545 ) 1546 .await 1547 .expect("bounce"), 1548 BounceResult { 1549 tx_id: 2, 1550 tx_new: false, 1551 bounce_id: 1, 1552 bounce_new: false 1553 } 1554 ); 1555 1556 // Bounce registered 1557 assert_eq!( 1558 register_bounce_tx_in( 1559 &mut db, 1560 &TxIn { 1561 code: 13, 1562 amount: amount.clone(), 1563 subject: "subject".into(), 1564 debtor: payto.clone(), 1565 value_date: date, 1566 status: TxStatus::Completed 1567 }, 1568 "good reason", 1569 &now 1570 ) 1571 .await 1572 .expect("bounce"), 1573 BounceResult { 1574 tx_id: 1, 1575 tx_new: false, 1576 bounce_id: 2, 1577 bounce_new: true 1578 } 1579 ); 1580 // Idempotent registered 1581 assert_eq!( 1582 register_bounce_tx_in( 1583 &mut db, 1584 &TxIn { 1585 code: 13, 1586 amount: amount.clone(), 1587 subject: "subject".into(), 1588 debtor: payto.clone(), 1589 value_date: date, 1590 status: TxStatus::Completed 1591 }, 1592 "good reason", 1593 &now 1594 ) 1595 .await 1596 .expect("bounce"), 1597 BounceResult { 1598 tx_id: 1, 1599 tx_new: false, 1600 bounce_id: 2, 1601 bounce_new: false 1602 } 1603 ); 1604 1605 // Batch 1606 assert_eq!( 1607 db::pending_batch(&mut db, &now).await.unwrap(), 1608 &[ 1609 Initiated { 1610 id: 1, 1611 amount: amount.clone(), 1612 subject: "bounce: 12".into(), 1613 creditor: payto.clone() 1614 }, 1615 Initiated { 1616 id: 2, 1617 amount, 1618 subject: "bounce: 13".into(), 1619 creditor: payto 1620 } 1621 ] 1622 ); 1623 } 1624 1625 #[tokio::test] 1626 async fn status() { 1627 let (mut db, _) = setup().await; 1628 1629 // Unknown transfer 1630 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") 1631 .await 1632 .unwrap(); 1633 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1634 .await 1635 .unwrap(); 1636 } 1637 1638 #[tokio::test] 1639 async fn batch() { 1640 let (mut db, pool) = setup().await; 1641 let start = Timestamp::now(); 1642 let magnet_payto = 1643 magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1644 1645 // Empty db 1646 let pendings = db::pending_batch(&mut db, &start) 1647 .await 1648 .expect("pending_batch"); 1649 assert_eq!(pendings.len(), 0); 1650 1651 // Some transfers 1652 for i in 0..3 { 1653 make_transfer( 1654 &pool, 1655 &db::Transfer { 1656 request_uid: HashCode::rand(), 1657 amount: decimal(format!("{}", i + 1)), 1658 exchange_base_url: url("https://exchange.test.com/"), 1659 metadata: None, 1660 wtid: ShortHashCode::rand(), 1661 creditor: magnet_payto.clone(), 1662 }, 1663 &Timestamp::now(), 1664 ) 1665 .await 1666 .expect("transfer"); 1667 } 1668 let pendings = db::pending_batch(&mut db, &start) 1669 .await 1670 .expect("pending_batch"); 1671 assert_eq!(pendings.len(), 3); 1672 1673 // Max 100 txs in batch 1674 for i in 0..100 { 1675 make_transfer( 1676 &pool, 1677 &db::Transfer { 1678 request_uid: HashCode::rand(), 1679 amount: decimal(format!("{}", i + 1)), 1680 exchange_base_url: url("https://exchange.test.com/"), 1681 metadata: None, 1682 wtid: ShortHashCode::rand(), 1683 creditor: magnet_payto.clone(), 1684 }, 1685 &Timestamp::now(), 1686 ) 1687 .await 1688 .expect("transfer"); 1689 } 1690 let pendings = db::pending_batch(&mut db, &start) 1691 .await 1692 .expect("pending_batch"); 1693 assert_eq!(pendings.len(), 100); 1694 1695 // Skip uploaded 1696 for i in 0..=10 { 1697 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1698 .await 1699 .expect("status success"); 1700 } 1701 let pendings = db::pending_batch(&mut db, &start) 1702 .await 1703 .expect("pending_batch"); 1704 assert_eq!(pendings.len(), 93); 1705 1706 // Skip failed 1707 for i in 0..=10 { 1708 db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") 1709 .await 1710 .expect("status failure"); 1711 } 1712 let pendings = db::pending_batch(&mut db, &start) 1713 .await 1714 .expect("pending_batch"); 1715 assert_eq!(pendings.len(), 83); 1716 } 1717 }