db.rs (49583B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::Timestamp; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api_common::{HashCode, ShortHashCode}, 30 api_params::{History, Page}, 31 api_revenue::RevenueIncomingBankTransaction, 32 api_transfer::{RegistrationRequest, Unregistration}, 33 api_wire::{ 34 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 35 TransferStatus, 36 }, 37 config::Config, 38 db::IncomingType, 39 types::{ 40 amount::{Currency, Decimal}, 41 payto::{PaytoImpl as _, PaytoURI}, 42 }, 43 }; 44 use tokio::sync::watch::{Receiver, Sender}; 45 use url::Url; 46 47 use crate::{ 48 config::parse_db_cfg, 49 payto::{CyclosAccount, CyclosId, FullCyclosPayto}, 50 }; 51 52 const SCHEMA: &str = "cyclos"; 53 54 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 55 let db = parse_db_cfg(cfg)?; 56 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 57 Ok(pool) 58 } 59 60 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 61 let db_cfg = parse_db_cfg(cfg)?; 62 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 63 let mut db = pool.acquire().await?; 64 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; 65 Ok(pool) 66 } 67 68 pub async fn notification_listener( 69 pool: PgPool, 70 in_channel: Sender<i64>, 71 taler_in_channel: Sender<i64>, 72 out_channel: Sender<i64>, 73 taler_out_channel: Sender<i64>, 74 ) -> sqlx::Result<()> { 75 taler_api::notification::notification_listener!(&pool, 76 "tx_in" => (row_id: i64) { 77 in_channel.send_replace(row_id); 78 }, 79 "taler_in" => (row_id: i64) { 80 taler_in_channel.send_replace(row_id); 81 }, 82 "tx_out" => (row_id: i64) { 83 out_channel.send_replace(row_id); 84 }, 85 "taler_out" => (row_id: i64) { 86 taler_out_channel.send_replace(row_id); 87 } 88 ) 89 } 90 91 #[derive(Debug, Clone)] 92 pub struct TxIn { 93 pub transfer_id: i64, 94 pub tx_id: Option<i64>, 95 pub amount: Decimal, 96 pub subject: String, 97 pub debtor_id: i64, 98 pub debtor_name: CompactString, 99 pub valued_at: Timestamp, 100 } 101 102 impl Display for TxIn { 103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 104 let Self { 105 transfer_id, 106 tx_id, 107 amount, 108 subject, 109 valued_at, 110 debtor_id, 111 debtor_name, 112 } = self; 113 let tx_id = match tx_id { 114 Some(id) => format_args!(":{}", *id), 115 None => format_args!(""), 116 }; 117 write!( 118 f, 119 "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'" 120 ) 121 } 122 } 123 124 #[derive(Debug, Clone)] 125 pub struct TxOut { 126 pub transfer_id: i64, 127 pub tx_id: Option<i64>, 128 pub amount: Decimal, 129 pub subject: String, 130 pub creditor_id: i64, 131 pub creditor_name: CompactString, 132 pub valued_at: Timestamp, 133 } 134 135 impl Display for TxOut { 136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 137 let Self { 138 transfer_id, 139 tx_id, 140 amount, 141 subject, 142 creditor_id, 143 creditor_name, 144 valued_at, 145 } = self; 146 let tx_id = match tx_id { 147 Some(id) => format_args!(":{}", *id), 148 None => format_args!(""), 149 }; 150 write!( 151 f, 152 "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 153 ) 154 } 155 } 156 157 #[derive(Debug, PartialEq, Eq)] 158 pub struct Initiated { 159 pub id: i64, 160 pub amount: Decimal, 161 pub subject: String, 162 pub creditor_id: i64, 163 pub creditor_name: CompactString, 164 } 165 166 impl Display for Initiated { 167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 168 let Self { 169 id, 170 amount, 171 subject, 172 creditor_id, 173 creditor_name, 174 } = self; 175 write!( 176 f, 177 "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 178 ) 179 } 180 } 181 182 #[derive(Debug, Clone)] 183 pub struct TxInAdmin { 184 pub amount: Decimal, 185 pub subject: String, 186 pub debtor_id: i64, 187 pub debtor_name: CompactString, 188 pub metadata: IncomingSubject, 189 } 190 191 #[derive(Debug, PartialEq, Eq)] 192 pub enum AddIncomingResult { 193 Success { 194 new: bool, 195 pending: bool, 196 row_id: u64, 197 valued_at: Timestamp, 198 }, 199 ReservePubReuse, 200 UnknownMapping, 201 MappingReuse, 202 } 203 204 /// Lock the database for worker execution 205 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 206 sqlx::query("SELECT pg_try_advisory_lock(42)") 207 .try_map(|r: PgRow| r.try_get(0)) 208 .fetch_one(e) 209 .await 210 } 211 212 pub async fn register_tx_in_admin( 213 db: &PgPool, 214 tx: &TxInAdmin, 215 now: &Timestamp, 216 ) -> sqlx::Result<AddIncomingResult> { 217 serialized!( 218 sqlx::query( 219 " 220 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 221 FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) 222 ", 223 ) 224 .bind(tx.amount) 225 .bind(&tx.subject) 226 .bind(tx.debtor_id) 227 .bind(&tx.debtor_name) 228 .bind_timestamp(now) 229 .bind(tx.metadata.ty()) 230 .bind(tx.metadata.key()) 231 .try_map(|r: PgRow| { 232 Ok(if r.try_get_flag(0)? { 233 AddIncomingResult::ReservePubReuse 234 } else if r.try_get_flag(1)? { 235 AddIncomingResult::MappingReuse 236 } else if r.try_get_flag(2)? { 237 AddIncomingResult::UnknownMapping 238 } else { 239 AddIncomingResult::Success { 240 row_id: r.try_get_u64(3)?, 241 valued_at: r.try_get_timestamp(4)?, 242 new: r.try_get(5)?, 243 pending: r.try_get(6)? 244 } 245 }) 246 }) 247 .fetch_one(db) 248 ) 249 } 250 251 pub async fn register_tx_in( 252 db: &mut PgConnection, 253 tx: &TxIn, 254 subject: &Option<IncomingSubject>, 255 now: &Timestamp, 256 ) -> sqlx::Result<AddIncomingResult> { 257 serialized!( 258 sqlx::query( 259 " 260 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 261 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 262 ", 263 ) 264 .bind(tx.transfer_id) 265 .bind(tx.tx_id) 266 .bind(tx.amount) 267 .bind(&tx.subject) 268 .bind(tx.debtor_id) 269 .bind(&tx.debtor_name) 270 .bind(tx.valued_at.as_microsecond()) 271 .bind(subject.as_ref().map(|it| it.ty())) 272 .bind(subject.as_ref().map(|it| it.key())) 273 .bind(now.as_microsecond()) 274 .try_map(|r: PgRow| { 275 Ok(if r.try_get_flag(0)? { 276 AddIncomingResult::ReservePubReuse 277 } else if r.try_get_flag(1)? { 278 AddIncomingResult::MappingReuse 279 } else if r.try_get_flag(2)? { 280 AddIncomingResult::UnknownMapping 281 } else { 282 AddIncomingResult::Success { 283 row_id: r.try_get_u64(3)?, 284 valued_at: r.try_get_timestamp(4)?, 285 new: r.try_get(5)?, 286 pending: r.try_get(6)? 287 } 288 }) 289 }) 290 .fetch_one(&mut *db) 291 ) 292 } 293 294 #[derive(Debug)] 295 pub enum TxOutKind { 296 Simple, 297 Bounce(i64), 298 Talerable(OutgoingSubject), 299 } 300 301 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 302 #[allow(non_camel_case_types)] 303 #[sqlx(type_name = "register_result")] 304 pub enum RegisterResult { 305 /// Already registered 306 idempotent, 307 /// Initiated transaction 308 known, 309 /// Recovered unknown outgoing transaction 310 recovered, 311 } 312 313 #[derive(Debug, PartialEq, Eq)] 314 pub struct AddOutgoingResult { 315 pub result: RegisterResult, 316 pub row_id: i64, 317 } 318 319 pub async fn register_tx_out( 320 db: &mut PgConnection, 321 tx: &TxOut, 322 kind: &TxOutKind, 323 now: &Timestamp, 324 ) -> sqlx::Result<AddOutgoingResult> { 325 serialized!({ 326 let query = sqlx::query( 327 " 328 SELECT out_result, out_tx_row_id 329 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 330 ", 331 ) 332 .bind(tx.transfer_id) 333 .bind(tx.tx_id) 334 .bind(tx.amount) 335 .bind(&tx.subject) 336 .bind(tx.creditor_id) 337 .bind(&tx.creditor_name) 338 .bind_timestamp(&tx.valued_at); 339 let query = match kind { 340 TxOutKind::Simple => query 341 .bind(None::<&[u8]>) 342 .bind(None::<&str>) 343 .bind(None::<&str>) 344 .bind(None::<i64>), 345 TxOutKind::Bounce(bounced) => query 346 .bind(None::<&[u8]>) 347 .bind(None::<&str>) 348 .bind(None::<&str>) 349 .bind(*bounced), 350 TxOutKind::Talerable(subject) => query 351 .bind(&subject.wtid) 352 .bind(subject.exchange_base_url.as_str()) 353 .bind(&subject.metadata) 354 .bind(None::<i64>), 355 }; 356 query 357 .bind_timestamp(now) 358 .try_map(|r: PgRow| { 359 Ok(AddOutgoingResult { 360 result: r.try_get(0)?, 361 row_id: r.try_get(1)?, 362 }) 363 }) 364 .fetch_one(&mut *db) 365 }) 366 } 367 368 #[derive(Debug, PartialEq, Eq)] 369 pub enum TransferResult { 370 Success { id: u64, initiated_at: Timestamp }, 371 RequestUidReuse, 372 WtidReuse, 373 } 374 375 #[derive(Debug, Clone)] 376 pub struct Transfer { 377 pub request_uid: HashCode, 378 pub amount: Decimal, 379 pub exchange_base_url: Url, 380 pub metadata: Option<CompactString>, 381 pub wtid: ShortHashCode, 382 pub creditor_id: i64, 383 pub creditor_name: CompactString, 384 } 385 386 pub async fn make_transfer( 387 db: &PgPool, 388 tx: &Transfer, 389 now: &Timestamp, 390 ) -> sqlx::Result<TransferResult> { 391 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 392 serialized!( 393 sqlx::query( 394 " 395 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 396 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 397 ", 398 ) 399 .bind(&tx.request_uid) 400 .bind(&tx.wtid) 401 .bind(&subject) 402 .bind(tx.amount) 403 .bind(tx.exchange_base_url.as_str()) 404 .bind(&tx.metadata) 405 .bind(tx.creditor_id) 406 .bind(&tx.creditor_name) 407 .bind_timestamp(now) 408 .try_map(|r: PgRow| { 409 Ok(if r.try_get_flag(0)? { 410 TransferResult::RequestUidReuse 411 } else if r.try_get_flag(1)? { 412 TransferResult::WtidReuse 413 } else { 414 TransferResult::Success { 415 id: r.try_get_u64(2)?, 416 initiated_at: r.try_get_timestamp(3)?, 417 } 418 }) 419 }) 420 .fetch_one(db) 421 ) 422 } 423 424 #[derive(Debug, PartialEq, Eq)] 425 pub struct BounceResult { 426 pub tx_id: u64, 427 pub tx_new: bool, 428 } 429 430 pub async fn register_bounced_tx_in( 431 db: &mut PgConnection, 432 tx: &TxIn, 433 chargeback_id: i64, 434 reason: &str, 435 now: &Timestamp, 436 ) -> sqlx::Result<BounceResult> { 437 serialized!( 438 sqlx::query( 439 " 440 SELECT out_tx_row_id, out_tx_new 441 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 442 ", 443 ) 444 .bind(tx.transfer_id) 445 .bind(tx.tx_id) 446 .bind(tx.amount) 447 .bind(&tx.subject) 448 .bind(tx.debtor_id) 449 .bind(&tx.debtor_name) 450 .bind_timestamp(&tx.valued_at) 451 .bind(chargeback_id) 452 .bind(reason) 453 .bind_timestamp(now) 454 .try_map(|r: PgRow| { 455 Ok(BounceResult { 456 tx_id: r.try_get_u64(0)?, 457 tx_new: r.try_get(1)?, 458 }) 459 }) 460 .fetch_one(&mut *db) 461 ) 462 } 463 464 pub async fn transfer_page( 465 db: &PgPool, 466 status: &Option<TransferState>, 467 currency: &Currency, 468 root: &CompactString, 469 params: &Page, 470 ) -> sqlx::Result<Vec<TransferListStatus>> { 471 page( 472 db, 473 "initiated_id", 474 params, 475 || { 476 let mut builder = QueryBuilder::new( 477 " 478 SELECT 479 initiated_id, 480 status, 481 amount, 482 credit_account, 483 credit_name, 484 initiated_at 485 FROM transfer 486 JOIN initiated USING (initiated_id) 487 WHERE 488 ", 489 ); 490 if let Some(status) = status { 491 builder.push(" status = ").push_bind(status).push(" AND "); 492 } 493 builder 494 }, 495 |r: PgRow| { 496 Ok(TransferListStatus { 497 row_id: r.try_get_safeu64(0)?, 498 status: r.try_get(1)?, 499 amount: r.try_get_amount(2, currency)?, 500 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 501 timestamp: r.try_get_timestamp(5)?.into(), 502 }) 503 }, 504 ) 505 .await 506 } 507 508 pub async fn outgoing_history( 509 db: &PgPool, 510 params: &History, 511 currency: &Currency, 512 root: &CompactString, 513 listen: impl FnOnce() -> Receiver<i64>, 514 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 515 history( 516 db, 517 "tx_out_id", 518 params, 519 listen, 520 || { 521 QueryBuilder::new( 522 " 523 SELECT 524 tx_out_id, 525 amount, 526 credit_account, 527 credit_name, 528 valued_at, 529 exchange_base_url, 530 metadata, 531 wtid 532 FROM taler_out 533 JOIN tx_out USING (tx_out_id) 534 WHERE 535 ", 536 ) 537 }, 538 |r: PgRow| { 539 Ok(OutgoingBankTransaction { 540 row_id: r.try_get_safeu64(0)?, 541 amount: r.try_get_amount(1, currency)?, 542 debit_fee: None, 543 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, 544 date: r.try_get_timestamp(4)?.into(), 545 exchange_base_url: r.try_get_url(5)?, 546 metadata: r.try_get(6)?, 547 wtid: r.try_get(7)?, 548 }) 549 }, 550 ) 551 .await 552 } 553 554 pub async fn incoming_history( 555 db: &PgPool, 556 params: &History, 557 currency: &Currency, 558 root: &CompactString, 559 listen: impl FnOnce() -> Receiver<i64>, 560 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 561 history( 562 db, 563 "tx_in_id", 564 params, 565 listen, 566 || { 567 QueryBuilder::new( 568 " 569 SELECT 570 type, 571 tx_in_id, 572 amount, 573 debit_account, 574 debit_name, 575 valued_at, 576 metadata, 577 authorization_pub, 578 authorization_sig 579 FROM taler_in 580 JOIN tx_in USING (tx_in_id) 581 WHERE 582 ", 583 ) 584 }, 585 |r: PgRow| { 586 Ok(match r.try_get(0)? { 587 IncomingType::reserve => IncomingBankTransaction::Reserve { 588 row_id: r.try_get_safeu64(1)?, 589 amount: r.try_get_amount(2, currency)?, 590 credit_fee: None, 591 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 592 date: r.try_get_timestamp(5)?.into(), 593 reserve_pub: r.try_get(6)?, 594 authorization_pub: r.try_get(7)?, 595 authorization_sig: r.try_get(8)?, 596 }, 597 IncomingType::kyc => IncomingBankTransaction::Kyc { 598 row_id: r.try_get_safeu64(1)?, 599 amount: r.try_get_amount(2, currency)?, 600 credit_fee: None, 601 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 602 date: r.try_get_timestamp(5)?.into(), 603 account_pub: r.try_get(6)?, 604 authorization_pub: r.try_get(7)?, 605 authorization_sig: r.try_get(8)?, 606 }, 607 IncomingType::map => unimplemented!("MAP are never listed in the history"), 608 }) 609 }, 610 ) 611 .await 612 } 613 614 pub async fn revenue_history( 615 db: &PgPool, 616 params: &History, 617 currency: &Currency, 618 root: &CompactString, 619 listen: impl FnOnce() -> Receiver<i64>, 620 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 621 history( 622 db, 623 "tx_in_id", 624 params, 625 listen, 626 || { 627 QueryBuilder::new( 628 " 629 SELECT 630 tx_in_id, 631 valued_at, 632 amount, 633 debit_account, 634 debit_name, 635 subject 636 FROM tx_in 637 WHERE 638 ", 639 ) 640 }, 641 |r: PgRow| { 642 Ok(RevenueIncomingBankTransaction { 643 row_id: r.try_get_safeu64(0)?, 644 date: r.try_get_timestamp(1)?.into(), 645 amount: r.try_get_amount(2, currency)?, 646 credit_fee: None, 647 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 648 subject: r.try_get(5)?, 649 }) 650 }, 651 ) 652 .await 653 } 654 655 pub async fn transfer_by_id( 656 db: &PgPool, 657 id: u64, 658 currency: &Currency, 659 root: &CompactString, 660 ) -> sqlx::Result<Option<TransferStatus>> { 661 serialized!( 662 sqlx::query( 663 " 664 SELECT 665 status, 666 status_msg, 667 amount, 668 exchange_base_url, 669 metadata, 670 wtid, 671 credit_account, 672 credit_name, 673 initiated_at 674 FROM transfer 675 JOIN initiated USING (initiated_id) 676 WHERE initiated_id = $1 677 ", 678 ) 679 .bind(id as i64) 680 .try_map(|r: PgRow| { 681 Ok(TransferStatus { 682 status: r.try_get(0)?, 683 status_msg: r.try_get(1)?, 684 amount: r.try_get_amount(2, currency)?, 685 origin_exchange_url: r.try_get(3)?, 686 metadata: r.try_get(4)?, 687 wtid: r.try_get(5)?, 688 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?, 689 timestamp: r.try_get_timestamp(8)?.into(), 690 }) 691 }) 692 .fetch_optional(db) 693 ) 694 } 695 696 /** Get a batch of pending initiated transactions not attempted since [start] */ 697 pub async fn pending_batch( 698 db: &mut PgConnection, 699 start: &Timestamp, 700 ) -> sqlx::Result<Vec<Initiated>> { 701 serialized!( 702 sqlx::query( 703 " 704 SELECT initiated_id, amount, subject, credit_account, credit_name 705 FROM initiated 706 WHERE tx_id IS NULL 707 AND status='pending' 708 AND (last_submitted IS NULL OR last_submitted < $1) 709 LIMIT 100 710 ", 711 ) 712 .bind_timestamp(start) 713 .try_map(|r: PgRow| { 714 Ok(Initiated { 715 id: r.try_get(0)?, 716 amount: r.try_get(1)?, 717 subject: r.try_get(2)?, 718 creditor_id: r.try_get(3)?, 719 creditor_name: r.try_get(4)?, 720 }) 721 }) 722 .fetch_all(&mut *db) 723 ) 724 } 725 726 /** Update status of a successful submitted initiated transaction */ 727 pub async fn initiated_submit_success( 728 db: &mut PgConnection, 729 initiated_id: i64, 730 timestamp: &Timestamp, 731 tx_id: i64, 732 ) -> sqlx::Result<()> { 733 serialized!( 734 sqlx::query( 735 " 736 UPDATE initiated 737 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 738 WHERE initiated_id=$3 739 " 740 ) 741 .bind_timestamp(timestamp) 742 .bind(tx_id) 743 .bind(initiated_id) 744 .execute(&mut *db) 745 )?; 746 Ok(()) 747 } 748 749 /** Update status of a permanently failed initiated transaction */ 750 pub async fn initiated_submit_permanent_failure( 751 db: &mut PgConnection, 752 initiated_id: i64, 753 msg: &str, 754 ) -> sqlx::Result<()> { 755 serialized!( 756 sqlx::query( 757 " 758 UPDATE initiated 759 SET status='permanent_failure', status_msg=$1 760 WHERE initiated_id=$2 761 ", 762 ) 763 .bind(msg) 764 .bind(initiated_id) 765 .execute(&mut *db) 766 )?; 767 Ok(()) 768 } 769 770 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 771 pub enum ChargebackFailureResult { 772 Unknown, 773 Known(u64), 774 Idempotent(u64), 775 } 776 777 /** Update status of a charged back initiated transaction */ 778 pub async fn initiated_chargeback_failure( 779 db: &mut PgConnection, 780 transfer_id: i64, 781 ) -> sqlx::Result<ChargebackFailureResult> { 782 Ok(serialized!( 783 sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") 784 .bind(transfer_id) 785 .try_map(|r: PgRow| { 786 let id = r.try_get_u64(0)?; 787 Ok(if id == 0 { 788 ChargebackFailureResult::Unknown 789 } else if r.try_get(1)? { 790 ChargebackFailureResult::Known(id) 791 } else { 792 ChargebackFailureResult::Idempotent(id) 793 }) 794 }) 795 .fetch_optional(&mut *db) 796 )? 797 .unwrap_or(ChargebackFailureResult::Unknown)) 798 } 799 800 /** Get JSON value from KV table */ 801 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 802 db: &mut PgConnection, 803 key: &str, 804 ) -> sqlx::Result<Option<T>> { 805 serialized!( 806 sqlx::query("SELECT value FROM kv WHERE key=$1") 807 .bind(key) 808 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 809 .fetch_optional(&mut *db) 810 ) 811 } 812 813 /** Set JSON value in KV table */ 814 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 815 serialized!( 816 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 817 .bind(key) 818 .bind(sqlx::types::Json(value)) 819 .execute(&mut *db) 820 )?; 821 Ok(()) 822 } 823 824 pub enum RegistrationResult { 825 Success, 826 ReservePubReuse, 827 } 828 829 pub async fn transfer_register( 830 db: &PgPool, 831 req: &RegistrationRequest, 832 ) -> sqlx::Result<RegistrationResult> { 833 let ty: IncomingType = req.r#type.into(); 834 serialized!( 835 sqlx::query( 836 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 837 ) 838 .bind(ty) 839 .bind(&req.account_pub) 840 .bind(&req.authorization_pub) 841 .bind(&req.authorization_sig) 842 .bind(req.recurrent) 843 .bind_timestamp(&Timestamp::now()) 844 .try_map(|r: PgRow| { 845 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 846 RegistrationResult::ReservePubReuse 847 } else { 848 RegistrationResult::Success 849 }) 850 }) 851 .fetch_one(db) 852 ) 853 } 854 855 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 856 serialized!( 857 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)") 858 .bind(&req.authorization_pub) 859 .try_map(|r: PgRow| r.try_get_flag("out_found")) 860 .fetch_one(db) 861 ) 862 } 863 864 pub trait CyclosTypeHelper { 865 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 866 &self, 867 idx: I, 868 name: I, 869 root: &CompactString, 870 ) -> sqlx::Result<FullCyclosPayto>; 871 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 872 &self, 873 idx: I, 874 name: I, 875 root: &CompactString, 876 ) -> sqlx::Result<PaytoURI>; 877 } 878 879 impl CyclosTypeHelper for PgRow { 880 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 881 &self, 882 idx: I, 883 name: I, 884 root: &CompactString, 885 ) -> sqlx::Result<FullCyclosPayto> { 886 let idx = self.try_get(idx)?; 887 let name = self.try_get(name)?; 888 Ok(FullCyclosPayto::new( 889 CyclosAccount { 890 id: CyclosId(idx), 891 root: root.clone(), 892 }, 893 name, 894 )) 895 } 896 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 897 &self, 898 idx: I, 899 name: I, 900 root: &CompactString, 901 ) -> sqlx::Result<PaytoURI> { 902 let idx = self.try_get(idx)?; 903 let name = self.try_get(name)?; 904 Ok(CyclosAccount { 905 id: CyclosId(idx), 906 root: root.clone(), 907 } 908 .as_full_payto(name)) 909 } 910 } 911 912 #[cfg(test)] 913 mod test { 914 use std::sync::LazyLock; 915 916 use compact_str::CompactString; 917 use jiff::{Span, Timestamp}; 918 use serde_json::json; 919 use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; 920 use taler_api::{ 921 db::TypeHelper, 922 notification::dummy_listen, 923 subject::{IncomingSubject, OutgoingSubject}, 924 }; 925 use taler_common::{ 926 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 927 api_params::{History, Page}, 928 api_wire::TransferState, 929 types::{ 930 amount::{Currency, decimal}, 931 url, 932 utils::now_sql_stable_ts, 933 }, 934 }; 935 936 use crate::{ 937 constants::CONFIG_SOURCE, 938 db::{ 939 self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult, 940 Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind, 941 }, 942 }; 943 944 pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap()); 945 pub const ROOT: CompactString = CompactString::const_new("localhost"); 946 947 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 948 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 949 } 950 951 #[tokio::test] 952 async fn kv() { 953 let (mut db, _) = setup().await; 954 955 let value = json!({ 956 "name": "Mr Smith", 957 "no way": 32 958 }); 959 960 assert_eq!( 961 db::kv_get::<serde_json::Value>(&mut db, "value") 962 .await 963 .unwrap(), 964 None 965 ); 966 db::kv_set(&mut db, "value", &value).await.unwrap(); 967 db::kv_set(&mut db, "value", &value).await.unwrap(); 968 assert_eq!( 969 db::kv_get::<serde_json::Value>(&mut db, "value") 970 .await 971 .unwrap(), 972 Some(value) 973 ); 974 } 975 976 #[tokio::test] 977 async fn tx_in() { 978 let (mut db, pool) = setup().await; 979 980 let mut routine = async |first: &Option<IncomingSubject>, 981 second: &Option<IncomingSubject>| { 982 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 983 .try_map(|r: PgRow| r.try_get_u64(0)) 984 .fetch_one(&mut *db) 985 .await 986 .unwrap(); 987 let now = now_sql_stable_ts(); 988 let later = now + Span::new().hours(2); 989 let tx = TxIn { 990 transfer_id: now.as_microsecond() as i64, 991 tx_id: None, 992 amount: decimal("10"), 993 subject: "subject".to_owned(), 994 debtor_id: 31000163100000000, 995 debtor_name: "Name".into(), 996 valued_at: now, 997 }; 998 // Insert 999 assert_eq!( 1000 db::register_tx_in(&mut db, &tx, first, &now) 1001 .await 1002 .expect("register tx in"), 1003 AddIncomingResult::Success { 1004 new: true, 1005 pending: false, 1006 row_id: id, 1007 valued_at: now, 1008 } 1009 ); 1010 // Idempotent 1011 assert_eq!( 1012 db::register_tx_in( 1013 &mut db, 1014 &TxIn { 1015 valued_at: later, 1016 ..tx.clone() 1017 }, 1018 first, 1019 &now 1020 ) 1021 .await 1022 .expect("register tx in"), 1023 AddIncomingResult::Success { 1024 new: false, 1025 pending: false, 1026 row_id: id, 1027 valued_at: now 1028 } 1029 ); 1030 // Many 1031 assert_eq!( 1032 db::register_tx_in( 1033 &mut db, 1034 &TxIn { 1035 transfer_id: later.as_microsecond() as i64, 1036 valued_at: later, 1037 ..tx 1038 }, 1039 second, 1040 &now 1041 ) 1042 .await 1043 .expect("register tx in"), 1044 AddIncomingResult::Success { 1045 new: true, 1046 pending: false, 1047 row_id: id + 1, 1048 valued_at: later 1049 } 1050 ); 1051 }; 1052 1053 // Empty db 1054 assert_eq!( 1055 db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1056 .await 1057 .unwrap(), 1058 Vec::new() 1059 ); 1060 assert_eq!( 1061 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1062 .await 1063 .unwrap(), 1064 Vec::new() 1065 ); 1066 1067 // Regular transaction 1068 routine(&None, &None).await; 1069 1070 // Reserve transaction 1071 routine( 1072 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1073 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1074 ) 1075 .await; 1076 1077 // Kyc transaction 1078 routine( 1079 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1080 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1081 ) 1082 .await; 1083 1084 // History 1085 assert_eq!( 1086 db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1087 .await 1088 .unwrap() 1089 .len(), 1090 6 1091 ); 1092 assert_eq!( 1093 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1094 .await 1095 .unwrap() 1096 .len(), 1097 4 1098 ); 1099 } 1100 1101 #[tokio::test] 1102 async fn tx_in_admin() { 1103 let (_, pool) = setup().await; 1104 1105 // Empty db 1106 assert_eq!( 1107 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1108 .await 1109 .unwrap(), 1110 Vec::new() 1111 ); 1112 1113 let now = now_sql_stable_ts(); 1114 let later = now + Span::new().hours(2); 1115 let tx = TxInAdmin { 1116 amount: decimal("10"), 1117 subject: "subject".to_owned(), 1118 debtor_id: 31000163100000000, 1119 debtor_name: "Name".into(), 1120 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1121 }; 1122 // Insert 1123 assert_eq!( 1124 db::register_tx_in_admin(&pool, &tx, &now) 1125 .await 1126 .expect("register tx in"), 1127 AddIncomingResult::Success { 1128 new: true, 1129 pending: false, 1130 row_id: 1, 1131 valued_at: now 1132 } 1133 ); 1134 // Idempotent 1135 assert_eq!( 1136 db::register_tx_in_admin(&pool, &tx, &later) 1137 .await 1138 .expect("register tx in"), 1139 AddIncomingResult::Success { 1140 new: false, 1141 pending: false, 1142 row_id: 1, 1143 valued_at: now 1144 } 1145 ); 1146 // Many 1147 assert_eq!( 1148 db::register_tx_in_admin( 1149 &pool, 1150 &TxInAdmin { 1151 subject: "Other".to_owned(), 1152 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1153 ..tx.clone() 1154 }, 1155 &later 1156 ) 1157 .await 1158 .expect("register tx in"), 1159 AddIncomingResult::Success { 1160 new: true, 1161 pending: false, 1162 row_id: 2, 1163 valued_at: later 1164 } 1165 ); 1166 1167 // History 1168 assert_eq!( 1169 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1170 .await 1171 .unwrap() 1172 .len(), 1173 2 1174 ); 1175 } 1176 1177 #[tokio::test] 1178 async fn tx_out() { 1179 let (mut db, pool) = setup().await; 1180 1181 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1182 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1183 .try_map(|r: PgRow| r.try_get(0)) 1184 .fetch_one(&mut *db) 1185 .await 1186 .unwrap(); 1187 let now = now_sql_stable_ts(); 1188 let later = now + Span::new().hours(2); 1189 let tx = TxOut { 1190 transfer_id, 1191 tx_id: Some(transfer_id), 1192 amount: decimal("10"), 1193 subject: "subject".to_owned(), 1194 creditor_id: 31000163100000000, 1195 creditor_name: "Name".into(), 1196 valued_at: now, 1197 }; 1198 assert!(matches!( 1199 db::make_transfer( 1200 &pool, 1201 &Transfer { 1202 request_uid: HashCode::rand(), 1203 amount: decimal("10"), 1204 exchange_base_url: url("https://exchange.test.com/"), 1205 metadata: None, 1206 wtid: ShortHashCode::rand(), 1207 creditor_id: 31000163100000000, 1208 creditor_name: "Name".into() 1209 }, 1210 &now 1211 ) 1212 .await 1213 .unwrap(), 1214 TransferResult::Success { .. } 1215 )); 1216 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id) 1217 .await 1218 .expect("status success"); 1219 1220 // Insert 1221 assert_eq!( 1222 db::register_tx_out(&mut db, &tx, first, &now) 1223 .await 1224 .expect("register tx out"), 1225 AddOutgoingResult { 1226 result: db::RegisterResult::known, 1227 row_id: transfer_id, 1228 } 1229 ); 1230 // Idempotent 1231 assert_eq!( 1232 db::register_tx_out( 1233 &mut db, 1234 &TxOut { 1235 valued_at: later, 1236 ..tx.clone() 1237 }, 1238 first, 1239 &now 1240 ) 1241 .await 1242 .expect("register tx out"), 1243 AddOutgoingResult { 1244 result: db::RegisterResult::idempotent, 1245 row_id: transfer_id, 1246 } 1247 ); 1248 // Recovered 1249 assert_eq!( 1250 db::register_tx_out( 1251 &mut db, 1252 &TxOut { 1253 transfer_id: transfer_id + 1, 1254 tx_id: Some(transfer_id + 1), 1255 valued_at: later, 1256 ..tx.clone() 1257 }, 1258 second, 1259 &now 1260 ) 1261 .await 1262 .expect("register tx out"), 1263 AddOutgoingResult { 1264 result: db::RegisterResult::recovered, 1265 row_id: transfer_id + 1, 1266 } 1267 ); 1268 }; 1269 1270 // Empty db 1271 assert_eq!( 1272 db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1273 .await 1274 .unwrap(), 1275 Vec::new() 1276 ); 1277 1278 // Regular transaction 1279 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1280 1281 // Talerable transaction 1282 routine( 1283 &TxOutKind::Talerable(OutgoingSubject::rand()), 1284 &TxOutKind::Talerable(OutgoingSubject::rand()), 1285 ) 1286 .await; 1287 1288 // Bounced transaction 1289 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1290 1291 // History 1292 assert_eq!( 1293 db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1294 .await 1295 .unwrap() 1296 .len(), 1297 2 1298 ); 1299 } 1300 1301 // TODO tx out failure 1302 1303 #[tokio::test] 1304 async fn transfer() { 1305 let (_, pool) = setup().await; 1306 1307 // Empty db 1308 assert_eq!( 1309 db::transfer_by_id(&pool, 0, &CURRENCY, &ROOT) 1310 .await 1311 .unwrap(), 1312 None 1313 ); 1314 assert_eq!( 1315 db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default()) 1316 .await 1317 .unwrap(), 1318 Vec::new() 1319 ); 1320 1321 let req = Transfer { 1322 request_uid: HashCode::rand(), 1323 amount: decimal("10"), 1324 exchange_base_url: url("https://exchange.test.com/"), 1325 metadata: None, 1326 wtid: ShortHashCode::rand(), 1327 creditor_id: 31000163100000000, 1328 creditor_name: "Name".into(), 1329 }; 1330 let now = now_sql_stable_ts(); 1331 let later = now + Span::new().hours(2); 1332 // Insert 1333 assert_eq!( 1334 db::make_transfer(&pool, &req, &now) 1335 .await 1336 .expect("transfer"), 1337 TransferResult::Success { 1338 id: 1, 1339 initiated_at: now 1340 } 1341 ); 1342 // Idempotent 1343 assert_eq!( 1344 db::make_transfer(&pool, &req, &later) 1345 .await 1346 .expect("transfer"), 1347 TransferResult::Success { 1348 id: 1, 1349 initiated_at: now 1350 } 1351 ); 1352 // Request UID reuse 1353 assert_eq!( 1354 db::make_transfer( 1355 &pool, 1356 &Transfer { 1357 wtid: ShortHashCode::rand(), 1358 ..req.clone() 1359 }, 1360 &now 1361 ) 1362 .await 1363 .expect("transfer"), 1364 TransferResult::RequestUidReuse 1365 ); 1366 // wtid reuse 1367 assert_eq!( 1368 db::make_transfer( 1369 &pool, 1370 &Transfer { 1371 request_uid: HashCode::rand(), 1372 ..req.clone() 1373 }, 1374 &now 1375 ) 1376 .await 1377 .expect("transfer"), 1378 TransferResult::WtidReuse 1379 ); 1380 // Many 1381 assert_eq!( 1382 db::make_transfer( 1383 &pool, 1384 &Transfer { 1385 request_uid: HashCode::rand(), 1386 wtid: ShortHashCode::rand(), 1387 ..req 1388 }, 1389 &later 1390 ) 1391 .await 1392 .expect("transfer"), 1393 TransferResult::Success { 1394 id: 2, 1395 initiated_at: later 1396 } 1397 ); 1398 1399 // Get 1400 assert!( 1401 db::transfer_by_id(&pool, 1, &CURRENCY, &ROOT) 1402 .await 1403 .unwrap() 1404 .is_some() 1405 ); 1406 assert!( 1407 db::transfer_by_id(&pool, 2, &CURRENCY, &ROOT) 1408 .await 1409 .unwrap() 1410 .is_some() 1411 ); 1412 assert!( 1413 db::transfer_by_id(&pool, 3, &CURRENCY, &ROOT) 1414 .await 1415 .unwrap() 1416 .is_none() 1417 ); 1418 assert_eq!( 1419 db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default()) 1420 .await 1421 .unwrap() 1422 .len(), 1423 2 1424 ); 1425 } 1426 1427 #[tokio::test] 1428 async fn bounce() { 1429 let (mut db, _) = setup().await; 1430 1431 let amount = decimal("10"); 1432 let now = now_sql_stable_ts(); 1433 1434 // Bounce 1435 assert_eq!( 1436 db::register_bounced_tx_in( 1437 &mut db, 1438 &TxIn { 1439 transfer_id: 12, 1440 tx_id: None, 1441 amount, 1442 subject: "subject".to_owned(), 1443 debtor_id: 31000163100000000, 1444 debtor_name: "Name".into(), 1445 valued_at: now 1446 }, 1447 22, 1448 "good reason", 1449 &now 1450 ) 1451 .await 1452 .expect("bounce"), 1453 BounceResult { 1454 tx_id: 1, 1455 tx_new: true 1456 } 1457 ); 1458 // Idempotent 1459 assert_eq!( 1460 db::register_bounced_tx_in( 1461 &mut db, 1462 &TxIn { 1463 transfer_id: 12, 1464 tx_id: None, 1465 amount, 1466 subject: "subject".to_owned(), 1467 debtor_id: 31000163100000000, 1468 debtor_name: "Name".into(), 1469 valued_at: now 1470 }, 1471 22, 1472 "good reason", 1473 &now 1474 ) 1475 .await 1476 .expect("bounce"), 1477 BounceResult { 1478 tx_id: 1, 1479 tx_new: false 1480 } 1481 ); 1482 1483 // Many 1484 assert_eq!( 1485 db::register_bounced_tx_in( 1486 &mut db, 1487 &TxIn { 1488 transfer_id: 13, 1489 tx_id: None, 1490 amount, 1491 subject: "subject".to_owned(), 1492 debtor_id: 31000163100000000, 1493 debtor_name: "Name".into(), 1494 valued_at: now 1495 }, 1496 23, 1497 "good reason", 1498 &now 1499 ) 1500 .await 1501 .expect("bounce"), 1502 BounceResult { 1503 tx_id: 2, 1504 tx_new: true 1505 } 1506 ); 1507 } 1508 1509 #[tokio::test] 1510 async fn status() { 1511 let (mut db, pool) = setup().await; 1512 1513 let check_status = async |id: u64, status: TransferState, msg: Option<&str>| { 1514 let transfer = db::transfer_by_id(&pool, id, &CURRENCY, &ROOT) 1515 .await 1516 .unwrap() 1517 .unwrap(); 1518 assert_eq!( 1519 (status, msg), 1520 (transfer.status, transfer.status_msg.as_deref()) 1521 ); 1522 }; 1523 1524 // Unknown transfer 1525 db::initiated_submit_permanent_failure(&mut db, 1, "msg") 1526 .await 1527 .unwrap(); 1528 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1529 .await 1530 .unwrap(); 1531 assert_eq!( 1532 db::initiated_chargeback_failure(&mut db, 1).await.unwrap(), 1533 ChargebackFailureResult::Unknown 1534 ); 1535 1536 // Failure 1537 db::make_transfer( 1538 &pool, 1539 &Transfer { 1540 request_uid: HashCode::rand(), 1541 amount: decimal("1"), 1542 exchange_base_url: url("https://exchange.test.com/"), 1543 metadata: None, 1544 wtid: ShortHashCode::rand(), 1545 creditor_id: 31000163100000000, 1546 creditor_name: "Name".into(), 1547 }, 1548 &Timestamp::now(), 1549 ) 1550 .await 1551 .expect("transfer"); 1552 check_status(1, TransferState::pending, None).await; 1553 db::initiated_submit_permanent_failure(&mut db, 1, "error status") 1554 .await 1555 .unwrap(); 1556 check_status(1, TransferState::permanent_failure, Some("error status")).await; 1557 1558 // Success 1559 db::make_transfer( 1560 &pool, 1561 &Transfer { 1562 request_uid: HashCode::rand(), 1563 amount: decimal("1"), 1564 exchange_base_url: url("https://exchange.test.com/"), 1565 metadata: None, 1566 wtid: ShortHashCode::rand(), 1567 creditor_id: 31000163100000000, 1568 creditor_name: "Name".into(), 1569 }, 1570 &Timestamp::now(), 1571 ) 1572 .await 1573 .expect("transfer"); 1574 check_status(2, TransferState::pending, None).await; 1575 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1576 .await 1577 .unwrap(); 1578 check_status(2, TransferState::pending, None).await; 1579 db::register_tx_out( 1580 &mut db, 1581 &TxOut { 1582 transfer_id: 5, 1583 tx_id: Some(3), 1584 amount: decimal("2"), 1585 subject: "".to_string(), 1586 creditor_id: 31000163100000000, 1587 creditor_name: "Name".into(), 1588 valued_at: Timestamp::now(), 1589 }, 1590 &TxOutKind::Simple, 1591 &Timestamp::now(), 1592 ) 1593 .await 1594 .unwrap(); 1595 check_status(2, TransferState::success, None).await; 1596 1597 // Chargeback 1598 assert_eq!( 1599 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1600 ChargebackFailureResult::Known(2) 1601 ); 1602 check_status(2, TransferState::late_failure, Some("charged back")).await; 1603 assert_eq!( 1604 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1605 ChargebackFailureResult::Idempotent(2) 1606 ); 1607 } 1608 1609 #[tokio::test] 1610 async fn batch() { 1611 let (mut db, pool) = setup().await; 1612 let start = Timestamp::now(); 1613 1614 // Empty db 1615 let pendings = db::pending_batch(&mut db, &start) 1616 .await 1617 .expect("pending_batch"); 1618 assert_eq!(pendings.len(), 0); 1619 1620 // Some transfers 1621 for i in 0..3 { 1622 db::make_transfer( 1623 &pool, 1624 &Transfer { 1625 request_uid: HashCode::rand(), 1626 amount: decimal(format!("{}", i + 1)), 1627 exchange_base_url: url("https://exchange.test.com/"), 1628 metadata: None, 1629 wtid: ShortHashCode::rand(), 1630 creditor_id: 31000163100000000, 1631 creditor_name: "Name".into(), 1632 }, 1633 &Timestamp::now(), 1634 ) 1635 .await 1636 .expect("transfer"); 1637 } 1638 let pendings = db::pending_batch(&mut db, &start) 1639 .await 1640 .expect("pending_batch"); 1641 assert_eq!(pendings.len(), 3); 1642 1643 // Max 100 txs in batch 1644 for i in 0..100 { 1645 db::make_transfer( 1646 &pool, 1647 &Transfer { 1648 request_uid: HashCode::rand(), 1649 amount: decimal(format!("{}", i + 1)), 1650 exchange_base_url: url("https://exchange.test.com/"), 1651 metadata: None, 1652 wtid: ShortHashCode::rand(), 1653 creditor_id: 31000163100000000, 1654 creditor_name: "Name".into(), 1655 }, 1656 &Timestamp::now(), 1657 ) 1658 .await 1659 .expect("transfer"); 1660 } 1661 let pendings = db::pending_batch(&mut db, &start) 1662 .await 1663 .expect("pending_batch"); 1664 assert_eq!(pendings.len(), 100); 1665 1666 // Skip uploaded 1667 for i in 0..=10 { 1668 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1669 .await 1670 .expect("status success"); 1671 } 1672 let pendings = db::pending_batch(&mut db, &start) 1673 .await 1674 .expect("pending_batch"); 1675 assert_eq!(pendings.len(), 93); 1676 1677 // Skip failed 1678 for i in 0..=10 { 1679 db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") 1680 .await 1681 .expect("status failure"); 1682 } 1683 let pendings = db::pending_batch(&mut db, &start) 1684 .await 1685 .expect("pending_batch"); 1686 assert_eq!(pendings.len(), 83); 1687 } 1688 }