db.rs (49090B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::Timestamp; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api::{ 30 HashCode, ShortHashCode, 31 params::{History, Page}, 32 prepared::{RegistrationRequest, Unregistration}, 33 revenue::RevenueIncomingBankTransaction, 34 wire::{ 35 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 36 TransferStatus, 37 }, 38 }, 39 config::Config, 40 db::IncomingType, 41 types::{ 42 amount::{Currency, Decimal}, 43 payto::{PaytoImpl as _, PaytoURI}, 44 }, 45 }; 46 use tokio::sync::watch::{Receiver, Sender}; 47 use url::Url; 48 49 use crate::{ 50 config::parse_db_cfg, 51 payto::{CyclosAccount, CyclosId, FullCyclosPayto}, 52 }; 53 54 const SCHEMA: &str = "cyclos"; 55 56 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 57 let db = parse_db_cfg(cfg)?; 58 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 59 Ok(pool) 60 } 61 62 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 63 let db_cfg = parse_db_cfg(cfg)?; 64 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 65 let mut db = pool.acquire().await?; 66 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; 67 Ok(pool) 68 } 69 70 pub async fn notification_listener( 71 pool: PgPool, 72 in_channel: Sender<i64>, 73 taler_in_channel: Sender<i64>, 74 out_channel: Sender<i64>, 75 taler_out_channel: Sender<i64>, 76 ) { 77 taler_api::notification::notification_listener!(&pool, 78 "tx_in" => (row_id: i64) { 79 in_channel.send_replace(row_id); 80 }, 81 "taler_in" => (row_id: i64) { 82 taler_in_channel.send_replace(row_id); 83 }, 84 "tx_out" => (row_id: i64) { 85 out_channel.send_replace(row_id); 86 }, 87 "taler_out" => (row_id: i64) { 88 taler_out_channel.send_replace(row_id); 89 } 90 ) 91 } 92 93 #[derive(Debug, Clone)] 94 pub struct TxIn { 95 pub transfer_id: i64, 96 pub tx_id: Option<i64>, 97 pub amount: Decimal, 98 pub subject: String, 99 pub debtor_id: i64, 100 pub debtor_name: CompactString, 101 pub valued_at: Timestamp, 102 } 103 104 impl Display for TxIn { 105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 106 let Self { 107 transfer_id, 108 tx_id, 109 amount, 110 subject, 111 valued_at, 112 debtor_id, 113 debtor_name, 114 } = self; 115 let tx_id = match tx_id { 116 Some(id) => format_args!(":{}", *id), 117 None => format_args!(""), 118 }; 119 write!( 120 f, 121 "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'" 122 ) 123 } 124 } 125 126 #[derive(Debug, Clone)] 127 pub struct TxOut { 128 pub transfer_id: i64, 129 pub tx_id: Option<i64>, 130 pub amount: Decimal, 131 pub subject: String, 132 pub creditor_id: i64, 133 pub creditor_name: CompactString, 134 pub valued_at: Timestamp, 135 } 136 137 impl Display for TxOut { 138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 139 let Self { 140 transfer_id, 141 tx_id, 142 amount, 143 subject, 144 creditor_id, 145 creditor_name, 146 valued_at, 147 } = self; 148 let tx_id = match tx_id { 149 Some(id) => format_args!(":{}", *id), 150 None => format_args!(""), 151 }; 152 write!( 153 f, 154 "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 155 ) 156 } 157 } 158 159 #[derive(Debug, PartialEq, Eq)] 160 pub struct Initiated { 161 pub id: i64, 162 pub amount: Decimal, 163 pub subject: String, 164 pub creditor_id: i64, 165 pub creditor_name: CompactString, 166 } 167 168 impl Display for Initiated { 169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 170 let Self { 171 id, 172 amount, 173 subject, 174 creditor_id, 175 creditor_name, 176 } = self; 177 write!( 178 f, 179 "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 180 ) 181 } 182 } 183 184 #[derive(Debug, Clone)] 185 pub struct TxInAdmin { 186 pub amount: Decimal, 187 pub subject: String, 188 pub debtor_id: i64, 189 pub debtor_name: CompactString, 190 pub metadata: IncomingSubject, 191 } 192 193 #[derive(Debug, PartialEq, Eq)] 194 pub enum AddIncomingResult { 195 Success { 196 new: bool, 197 pending: bool, 198 row_id: u64, 199 valued_at: Timestamp, 200 }, 201 ReservePubReuse, 202 UnknownMapping, 203 MappingReuse, 204 } 205 206 /// Lock the database for worker execution 207 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 208 sqlx::query("SELECT pg_try_advisory_lock(42)") 209 .try_map(|r: PgRow| r.try_get(0)) 210 .fetch_one(e) 211 .await 212 } 213 214 pub async fn register_tx_in_admin( 215 db: &PgPool, 216 tx: &TxInAdmin, 217 now: &Timestamp, 218 ) -> sqlx::Result<AddIncomingResult> { 219 serialized!( 220 sqlx::query( 221 " 222 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 223 FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) 224 ", 225 ) 226 .bind(tx.amount) 227 .bind(&tx.subject) 228 .bind(tx.debtor_id) 229 .bind(&tx.debtor_name) 230 .bind_timestamp(now) 231 .bind(tx.metadata.ty()) 232 .bind(tx.metadata.key()) 233 .try_map(|r: PgRow| { 234 Ok(if r.try_get_flag(0)? { 235 AddIncomingResult::ReservePubReuse 236 } else if r.try_get_flag(1)? { 237 AddIncomingResult::MappingReuse 238 } else if r.try_get_flag(2)? { 239 AddIncomingResult::UnknownMapping 240 } else { 241 AddIncomingResult::Success { 242 row_id: r.try_get_u64(3)?, 243 valued_at: r.try_get_timestamp(4)?, 244 new: r.try_get(5)?, 245 pending: r.try_get(6)? 246 } 247 }) 248 }) 249 .fetch_one(db) 250 ) 251 } 252 253 pub async fn register_tx_in( 254 db: &mut PgConnection, 255 tx: &TxIn, 256 subject: &Option<IncomingSubject>, 257 now: &Timestamp, 258 ) -> sqlx::Result<AddIncomingResult> { 259 serialized!( 260 sqlx::query( 261 " 262 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 263 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 264 ", 265 ) 266 .bind(tx.transfer_id) 267 .bind(tx.tx_id) 268 .bind(tx.amount) 269 .bind(&tx.subject) 270 .bind(tx.debtor_id) 271 .bind(&tx.debtor_name) 272 .bind(tx.valued_at.as_microsecond()) 273 .bind(subject.as_ref().map(|it| it.ty())) 274 .bind(subject.as_ref().map(|it| it.key())) 275 .bind(now.as_microsecond()) 276 .try_map(|r: PgRow| { 277 Ok(if r.try_get_flag(0)? { 278 AddIncomingResult::ReservePubReuse 279 } else if r.try_get_flag(1)? { 280 AddIncomingResult::MappingReuse 281 } else if r.try_get_flag(2)? { 282 AddIncomingResult::UnknownMapping 283 } else { 284 AddIncomingResult::Success { 285 row_id: r.try_get_u64(3)?, 286 valued_at: r.try_get_timestamp(4)?, 287 new: r.try_get(5)?, 288 pending: r.try_get(6)? 289 } 290 }) 291 }) 292 .fetch_one(&mut *db) 293 ) 294 } 295 296 #[derive(Debug)] 297 pub enum TxOutKind { 298 Simple, 299 Bounce(i64), 300 Talerable(OutgoingSubject), 301 } 302 303 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 304 #[allow(non_camel_case_types)] 305 #[sqlx(type_name = "register_result")] 306 pub enum RegisterResult { 307 /// Already registered 308 idempotent, 309 /// Initiated transaction 310 known, 311 /// Recovered unknown outgoing transaction 312 recovered, 313 } 314 315 #[derive(Debug, PartialEq, Eq)] 316 pub struct AddOutgoingResult { 317 pub result: RegisterResult, 318 pub row_id: i64, 319 } 320 321 pub async fn register_tx_out( 322 db: &mut PgConnection, 323 tx: &TxOut, 324 kind: &TxOutKind, 325 now: &Timestamp, 326 ) -> sqlx::Result<AddOutgoingResult> { 327 serialized!({ 328 let query = sqlx::query( 329 " 330 SELECT out_result, out_tx_row_id 331 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 332 ", 333 ) 334 .bind(tx.transfer_id) 335 .bind(tx.tx_id) 336 .bind(tx.amount) 337 .bind(&tx.subject) 338 .bind(tx.creditor_id) 339 .bind(&tx.creditor_name) 340 .bind_timestamp(&tx.valued_at); 341 let query = match kind { 342 TxOutKind::Simple => query 343 .bind(None::<&[u8]>) 344 .bind(None::<&str>) 345 .bind(None::<&str>) 346 .bind(None::<i64>), 347 TxOutKind::Bounce(bounced) => query 348 .bind(None::<&[u8]>) 349 .bind(None::<&str>) 350 .bind(None::<&str>) 351 .bind(*bounced), 352 TxOutKind::Talerable(subject) => query 353 .bind(&subject.wtid) 354 .bind(subject.exchange_base_url.as_str()) 355 .bind(&subject.metadata) 356 .bind(None::<i64>), 357 }; 358 query 359 .bind_timestamp(now) 360 .try_map(|r: PgRow| { 361 Ok(AddOutgoingResult { 362 result: r.try_get(0)?, 363 row_id: r.try_get(1)?, 364 }) 365 }) 366 .fetch_one(&mut *db) 367 }) 368 } 369 370 #[derive(Debug, PartialEq, Eq)] 371 pub enum TransferResult { 372 Success { id: u64, initiated_at: Timestamp }, 373 RequestUidReuse, 374 WtidReuse, 375 } 376 377 #[derive(Debug, Clone)] 378 pub struct Transfer { 379 pub request_uid: HashCode, 380 pub amount: Decimal, 381 pub exchange_base_url: Url, 382 pub metadata: Option<CompactString>, 383 pub wtid: ShortHashCode, 384 pub creditor_id: i64, 385 pub creditor_name: CompactString, 386 } 387 388 pub async fn make_transfer( 389 db: &PgPool, 390 tx: &Transfer, 391 now: &Timestamp, 392 ) -> sqlx::Result<TransferResult> { 393 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 394 serialized!( 395 sqlx::query( 396 " 397 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 398 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 399 ", 400 ) 401 .bind(&tx.request_uid) 402 .bind(&tx.wtid) 403 .bind(&subject) 404 .bind(tx.amount) 405 .bind(tx.exchange_base_url.as_str()) 406 .bind(&tx.metadata) 407 .bind(tx.creditor_id) 408 .bind(&tx.creditor_name) 409 .bind_timestamp(now) 410 .try_map(|r: PgRow| { 411 Ok(if r.try_get_flag(0)? { 412 TransferResult::RequestUidReuse 413 } else if r.try_get_flag(1)? { 414 TransferResult::WtidReuse 415 } else { 416 TransferResult::Success { 417 id: r.try_get_u64(2)?, 418 initiated_at: r.try_get_timestamp(3)?, 419 } 420 }) 421 }) 422 .fetch_one(db) 423 ) 424 } 425 426 #[derive(Debug, PartialEq, Eq)] 427 pub struct BounceResult { 428 pub tx_id: u64, 429 pub tx_new: bool, 430 } 431 432 pub async fn register_bounced_tx_in( 433 db: &mut PgConnection, 434 tx: &TxIn, 435 chargeback_id: i64, 436 reason: &str, 437 now: &Timestamp, 438 ) -> sqlx::Result<BounceResult> { 439 serialized!( 440 sqlx::query( 441 " 442 SELECT out_tx_row_id, out_tx_new 443 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 444 ", 445 ) 446 .bind(tx.transfer_id) 447 .bind(tx.tx_id) 448 .bind(tx.amount) 449 .bind(&tx.subject) 450 .bind(tx.debtor_id) 451 .bind(&tx.debtor_name) 452 .bind_timestamp(&tx.valued_at) 453 .bind(chargeback_id) 454 .bind(reason) 455 .bind_timestamp(now) 456 .try_map(|r: PgRow| { 457 Ok(BounceResult { 458 tx_id: r.try_get_u64(0)?, 459 tx_new: r.try_get(1)?, 460 }) 461 }) 462 .fetch_one(&mut *db) 463 ) 464 } 465 466 pub async fn transfer_page( 467 db: &PgPool, 468 status: &Option<TransferState>, 469 currency: &Currency, 470 root: &CompactString, 471 params: &Page, 472 ) -> sqlx::Result<Vec<TransferListStatus>> { 473 page( 474 db, 475 params, 476 "initiated_id", 477 || { 478 let mut builder = QueryBuilder::new( 479 " 480 SELECT 481 initiated_id, 482 status, 483 amount, 484 credit_account, 485 credit_name, 486 initiated_at 487 FROM transfer 488 JOIN initiated USING (initiated_id) 489 WHERE 490 ", 491 ); 492 if let Some(status) = status { 493 builder.push(" status = ").push_bind(status).push(" AND "); 494 } 495 builder 496 }, 497 |r: PgRow| { 498 Ok(TransferListStatus { 499 row_id: r.try_get_u64(0)?, 500 status: r.try_get(1)?, 501 amount: r.try_get_amount(2, currency)?, 502 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 503 timestamp: r.try_get_timestamp(5)?.into(), 504 }) 505 }, 506 ) 507 .await 508 } 509 510 pub async fn outgoing_history( 511 db: &PgPool, 512 params: &History, 513 currency: &Currency, 514 root: &CompactString, 515 listen: impl FnOnce() -> Receiver<i64>, 516 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 517 history( 518 db, 519 "tx_out_id", 520 params, 521 listen, 522 || { 523 QueryBuilder::new( 524 " 525 SELECT 526 tx_out_id, 527 amount, 528 credit_account, 529 credit_name, 530 valued_at, 531 exchange_base_url, 532 metadata, 533 wtid 534 FROM taler_out 535 JOIN tx_out USING (tx_out_id) 536 WHERE 537 ", 538 ) 539 }, 540 |r: PgRow| { 541 Ok(OutgoingBankTransaction { 542 row_id: r.try_get_u64(0)?, 543 amount: r.try_get_amount(1, currency)?, 544 debit_fee: None, 545 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, 546 date: r.try_get_timestamp(4)?.into(), 547 exchange_base_url: r.try_get_url(5)?, 548 metadata: r.try_get(6)?, 549 wtid: r.try_get(7)?, 550 }) 551 }, 552 ) 553 .await 554 } 555 556 pub async fn incoming_history( 557 db: &PgPool, 558 params: &History, 559 currency: &Currency, 560 root: &CompactString, 561 listen: impl FnOnce() -> Receiver<i64>, 562 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 563 history( 564 db, 565 "tx_in_id", 566 params, 567 listen, 568 || { 569 QueryBuilder::new( 570 " 571 SELECT 572 type, 573 tx_in_id, 574 amount, 575 debit_account, 576 debit_name, 577 valued_at, 578 metadata, 579 authorization_pub, 580 authorization_sig 581 FROM taler_in 582 JOIN tx_in USING (tx_in_id) 583 WHERE 584 ", 585 ) 586 }, 587 |r: PgRow| { 588 Ok(match r.try_get(0)? { 589 IncomingType::reserve => IncomingBankTransaction::Reserve { 590 row_id: r.try_get_u64(1)?, 591 amount: r.try_get_amount(2, currency)?, 592 credit_fee: None, 593 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 594 date: r.try_get_timestamp(5)?.into(), 595 reserve_pub: r.try_get(6)?, 596 authorization_pub: r.try_get(7)?, 597 authorization_sig: r.try_get(8)?, 598 }, 599 IncomingType::kyc => IncomingBankTransaction::Kyc { 600 row_id: r.try_get_u64(1)?, 601 amount: r.try_get_amount(2, currency)?, 602 credit_fee: None, 603 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 604 date: r.try_get_timestamp(5)?.into(), 605 account_pub: r.try_get(6)?, 606 authorization_pub: r.try_get(7)?, 607 authorization_sig: r.try_get(8)?, 608 }, 609 IncomingType::map => unimplemented!("MAP are never listed in the history"), 610 }) 611 }, 612 ) 613 .await 614 } 615 616 pub async fn revenue_history( 617 db: &PgPool, 618 params: &History, 619 currency: &Currency, 620 root: &CompactString, 621 listen: impl FnOnce() -> Receiver<i64>, 622 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 623 history( 624 db, 625 "tx_in_id", 626 params, 627 listen, 628 || { 629 QueryBuilder::new( 630 " 631 SELECT 632 tx_in_id, 633 valued_at, 634 amount, 635 debit_account, 636 debit_name, 637 subject 638 FROM tx_in 639 WHERE 640 ", 641 ) 642 }, 643 |r: PgRow| { 644 Ok(RevenueIncomingBankTransaction { 645 row_id: r.try_get_u64(0)?, 646 date: r.try_get_timestamp(1)?.into(), 647 amount: r.try_get_amount(2, currency)?, 648 credit_fee: None, 649 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 650 subject: r.try_get(5)?, 651 }) 652 }, 653 ) 654 .await 655 } 656 657 pub async fn transfer_by_id( 658 db: &PgPool, 659 id: u64, 660 currency: &Currency, 661 root: &CompactString, 662 ) -> sqlx::Result<Option<TransferStatus>> { 663 serialized!( 664 sqlx::query( 665 " 666 SELECT 667 status, 668 status_msg, 669 amount, 670 exchange_base_url, 671 metadata, 672 wtid, 673 credit_account, 674 credit_name, 675 initiated_at 676 FROM transfer 677 JOIN initiated USING (initiated_id) 678 WHERE initiated_id = $1 679 ", 680 ) 681 .bind(id as i64) 682 .try_map(|r: PgRow| { 683 Ok(TransferStatus { 684 status: r.try_get(0)?, 685 status_msg: r.try_get(1)?, 686 amount: r.try_get_amount(2, currency)?, 687 origin_exchange_url: r.try_get(3)?, 688 metadata: r.try_get(4)?, 689 wtid: r.try_get(5)?, 690 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?, 691 timestamp: r.try_get_timestamp(8)?.into(), 692 }) 693 }) 694 .fetch_optional(db) 695 ) 696 } 697 698 /** Get a batch of pending initiated transactions not attempted since [start] */ 699 pub async fn pending_batch( 700 db: &mut PgConnection, 701 start: &Timestamp, 702 ) -> sqlx::Result<Vec<Initiated>> { 703 serialized!( 704 sqlx::query( 705 " 706 SELECT initiated_id, amount, subject, credit_account, credit_name 707 FROM initiated 708 WHERE tx_id IS NULL 709 AND status='pending' 710 AND (last_submitted IS NULL OR last_submitted < $1) 711 LIMIT 100 712 ", 713 ) 714 .bind_timestamp(start) 715 .try_map(|r: PgRow| { 716 Ok(Initiated { 717 id: r.try_get(0)?, 718 amount: r.try_get(1)?, 719 subject: r.try_get(2)?, 720 creditor_id: r.try_get(3)?, 721 creditor_name: r.try_get(4)?, 722 }) 723 }) 724 .fetch_all(&mut *db) 725 ) 726 } 727 728 /** Update status of a successful submitted initiated transaction */ 729 pub async fn initiated_submit_success( 730 db: &mut PgConnection, 731 initiated_id: i64, 732 timestamp: &Timestamp, 733 tx_id: i64, 734 ) -> sqlx::Result<()> { 735 serialized!( 736 sqlx::query( 737 " 738 UPDATE initiated 739 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 740 WHERE initiated_id=$3 741 " 742 ) 743 .bind_timestamp(timestamp) 744 .bind(tx_id) 745 .bind(initiated_id) 746 .execute(&mut *db) 747 )?; 748 Ok(()) 749 } 750 751 /** Update status of a permanently failed initiated transaction */ 752 pub async fn initiated_submit_permanent_failure( 753 db: &mut PgConnection, 754 initiated_id: i64, 755 msg: &str, 756 ) -> sqlx::Result<()> { 757 serialized!( 758 sqlx::query( 759 " 760 UPDATE initiated 761 SET status='permanent_failure', status_msg=$1 762 WHERE initiated_id=$2 763 ", 764 ) 765 .bind(msg) 766 .bind(initiated_id) 767 .execute(&mut *db) 768 )?; 769 Ok(()) 770 } 771 772 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 773 pub enum ChargebackFailureResult { 774 Unknown, 775 Known(u64), 776 Idempotent(u64), 777 } 778 779 /** Update status of a charged back initiated transaction */ 780 pub async fn initiated_chargeback_failure( 781 db: &mut PgConnection, 782 transfer_id: i64, 783 ) -> sqlx::Result<ChargebackFailureResult> { 784 Ok(serialized!( 785 sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") 786 .bind(transfer_id) 787 .try_map(|r: PgRow| { 788 let id = r.try_get_u64(0)?; 789 Ok(if id == 0 { 790 ChargebackFailureResult::Unknown 791 } else if r.try_get(1)? { 792 ChargebackFailureResult::Known(id) 793 } else { 794 ChargebackFailureResult::Idempotent(id) 795 }) 796 }) 797 .fetch_optional(&mut *db) 798 )? 799 .unwrap_or(ChargebackFailureResult::Unknown)) 800 } 801 802 /** Get JSON value from KV table */ 803 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 804 db: &mut PgConnection, 805 key: &str, 806 ) -> sqlx::Result<Option<T>> { 807 serialized!( 808 sqlx::query("SELECT value FROM kv WHERE key=$1") 809 .bind(key) 810 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 811 .fetch_optional(&mut *db) 812 ) 813 } 814 815 /** Set JSON value in KV table */ 816 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 817 serialized!( 818 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 819 .bind(key) 820 .bind(sqlx::types::Json(value)) 821 .execute(&mut *db) 822 )?; 823 Ok(()) 824 } 825 826 pub enum RegistrationResult { 827 Success, 828 ReservePubReuse, 829 } 830 831 pub async fn transfer_register( 832 db: &PgPool, 833 req: &RegistrationRequest, 834 ) -> sqlx::Result<RegistrationResult> { 835 let ty: IncomingType = req.r#type.into(); 836 serialized!( 837 sqlx::query( 838 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 839 ) 840 .bind(ty) 841 .bind(&req.account_pub) 842 .bind(&req.authorization_pub) 843 .bind(&req.authorization_sig) 844 .bind(req.recurrent) 845 .bind_timestamp(&Timestamp::now()) 846 .try_map(|r: PgRow| { 847 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 848 RegistrationResult::ReservePubReuse 849 } else { 850 RegistrationResult::Success 851 }) 852 }) 853 .fetch_one(db) 854 ) 855 } 856 857 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 858 serialized!( 859 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)") 860 .bind(&req.authorization_pub) 861 .try_map(|r: PgRow| r.try_get_flag("out_found")) 862 .fetch_one(db) 863 ) 864 } 865 866 pub trait CyclosTypeHelper { 867 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 868 &self, 869 idx: I, 870 name: I, 871 root: &CompactString, 872 ) -> sqlx::Result<FullCyclosPayto>; 873 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 874 &self, 875 idx: I, 876 name: I, 877 root: &CompactString, 878 ) -> sqlx::Result<PaytoURI>; 879 } 880 881 impl CyclosTypeHelper for PgRow { 882 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 883 &self, 884 idx: I, 885 name: I, 886 root: &CompactString, 887 ) -> sqlx::Result<FullCyclosPayto> { 888 let idx = self.try_get(idx)?; 889 let name = self.try_get(name)?; 890 Ok(FullCyclosPayto::new( 891 CyclosAccount { 892 id: CyclosId(idx), 893 root: root.clone(), 894 }, 895 name, 896 )) 897 } 898 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 899 &self, 900 idx: I, 901 name: I, 902 root: &CompactString, 903 ) -> sqlx::Result<PaytoURI> { 904 let idx = self.try_get(idx)?; 905 let name = self.try_get(name)?; 906 Ok(CyclosAccount { 907 id: CyclosId(idx), 908 root: root.clone(), 909 } 910 .as_full_uri(name)) 911 } 912 } 913 914 #[cfg(test)] 915 mod test { 916 use std::assert_matches; 917 918 use compact_str::CompactString; 919 use jiff::{Span, Timestamp}; 920 use serde_json::json; 921 use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; 922 use taler_api::{ 923 db::TypeHelper, 924 notification::dummy_listen, 925 subject::{IncomingSubject, OutgoingSubject}, 926 }; 927 use taler_common::{ 928 api::{ 929 EddsaPublicKey, HashCode, ShortHashCode, 930 params::{History, Page}, 931 wire::TransferState, 932 }, 933 types::{ 934 amount::{Currency, decimal}, 935 url, 936 utils::now_sql_stable_ts, 937 }, 938 }; 939 940 use crate::{ 941 constants::CONFIG_SOURCE, 942 db::{ 943 self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult, 944 Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind, 945 }, 946 }; 947 948 pub const CURR: Currency = Currency::TEST; 949 pub const ROOT: CompactString = CompactString::const_new("localhost"); 950 951 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 952 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 953 } 954 955 #[tokio::test] 956 async fn kv() { 957 let (mut db, _) = setup().await; 958 959 let value = json!({ 960 "name": "Mr Smith", 961 "no way": 32 962 }); 963 964 assert_eq!( 965 db::kv_get::<serde_json::Value>(&mut db, "value") 966 .await 967 .unwrap(), 968 None 969 ); 970 db::kv_set(&mut db, "value", &value).await.unwrap(); 971 db::kv_set(&mut db, "value", &value).await.unwrap(); 972 assert_eq!( 973 db::kv_get::<serde_json::Value>(&mut db, "value") 974 .await 975 .unwrap(), 976 Some(value) 977 ); 978 } 979 980 #[tokio::test] 981 async fn tx_in() { 982 let (mut db, pool) = setup().await; 983 984 let mut routine = async |first: &Option<IncomingSubject>, 985 second: &Option<IncomingSubject>| { 986 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 987 .try_map(|r: PgRow| r.try_get_u64(0)) 988 .fetch_one(&mut *db) 989 .await 990 .unwrap(); 991 let now = now_sql_stable_ts(); 992 let later = now + Span::new().hours(2); 993 let tx = TxIn { 994 transfer_id: now.as_microsecond() as i64, 995 tx_id: None, 996 amount: decimal("10"), 997 subject: "subject".to_owned(), 998 debtor_id: 31000163100000000, 999 debtor_name: "Name".into(), 1000 valued_at: now, 1001 }; 1002 // Insert 1003 assert_eq!( 1004 db::register_tx_in(&mut db, &tx, first, &now) 1005 .await 1006 .expect("register tx in"), 1007 AddIncomingResult::Success { 1008 new: true, 1009 pending: false, 1010 row_id: id, 1011 valued_at: now, 1012 } 1013 ); 1014 // Idempotent 1015 assert_eq!( 1016 db::register_tx_in( 1017 &mut db, 1018 &TxIn { 1019 valued_at: later, 1020 ..tx.clone() 1021 }, 1022 first, 1023 &now 1024 ) 1025 .await 1026 .expect("register tx in"), 1027 AddIncomingResult::Success { 1028 new: false, 1029 pending: false, 1030 row_id: id, 1031 valued_at: now 1032 } 1033 ); 1034 // Many 1035 assert_eq!( 1036 db::register_tx_in( 1037 &mut db, 1038 &TxIn { 1039 transfer_id: later.as_microsecond() as i64, 1040 valued_at: later, 1041 ..tx 1042 }, 1043 second, 1044 &now 1045 ) 1046 .await 1047 .expect("register tx in"), 1048 AddIncomingResult::Success { 1049 new: true, 1050 pending: false, 1051 row_id: id + 1, 1052 valued_at: later 1053 } 1054 ); 1055 }; 1056 1057 // Empty db 1058 assert_eq!( 1059 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1060 .await 1061 .unwrap(), 1062 Vec::new() 1063 ); 1064 assert_eq!( 1065 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1066 .await 1067 .unwrap(), 1068 Vec::new() 1069 ); 1070 1071 // Regular transaction 1072 routine(&None, &None).await; 1073 1074 // Reserve transaction 1075 routine( 1076 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1077 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1078 ) 1079 .await; 1080 1081 // Kyc transaction 1082 routine( 1083 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1084 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1085 ) 1086 .await; 1087 1088 // History 1089 assert_eq!( 1090 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1091 .await 1092 .unwrap() 1093 .len(), 1094 6 1095 ); 1096 assert_eq!( 1097 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1098 .await 1099 .unwrap() 1100 .len(), 1101 4 1102 ); 1103 } 1104 1105 #[tokio::test] 1106 async fn tx_in_admin() { 1107 let (_, pool) = setup().await; 1108 1109 // Empty db 1110 assert_eq!( 1111 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1112 .await 1113 .unwrap(), 1114 Vec::new() 1115 ); 1116 1117 let now = now_sql_stable_ts(); 1118 let later = now + Span::new().hours(2); 1119 let tx = TxInAdmin { 1120 amount: decimal("10"), 1121 subject: "subject".to_owned(), 1122 debtor_id: 31000163100000000, 1123 debtor_name: "Name".into(), 1124 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1125 }; 1126 // Insert 1127 assert_eq!( 1128 db::register_tx_in_admin(&pool, &tx, &now) 1129 .await 1130 .expect("register tx in"), 1131 AddIncomingResult::Success { 1132 new: true, 1133 pending: false, 1134 row_id: 1, 1135 valued_at: now 1136 } 1137 ); 1138 // Many 1139 assert_eq!( 1140 db::register_tx_in_admin( 1141 &pool, 1142 &TxInAdmin { 1143 subject: "Other".to_owned(), 1144 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1145 ..tx.clone() 1146 }, 1147 &later 1148 ) 1149 .await 1150 .expect("register tx in"), 1151 AddIncomingResult::Success { 1152 new: true, 1153 pending: false, 1154 row_id: 2, 1155 valued_at: later 1156 } 1157 ); 1158 1159 // History 1160 assert_eq!( 1161 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1162 .await 1163 .unwrap() 1164 .len(), 1165 2 1166 ); 1167 } 1168 1169 #[tokio::test] 1170 async fn tx_out() { 1171 let (mut db, pool) = setup().await; 1172 1173 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1174 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1175 .try_map(|r: PgRow| r.try_get(0)) 1176 .fetch_one(&mut *db) 1177 .await 1178 .unwrap(); 1179 let now = now_sql_stable_ts(); 1180 let later = now + Span::new().hours(2); 1181 let tx = TxOut { 1182 transfer_id, 1183 tx_id: Some(transfer_id), 1184 amount: decimal("10"), 1185 subject: "subject".to_owned(), 1186 creditor_id: 31000163100000000, 1187 creditor_name: "Name".into(), 1188 valued_at: now, 1189 }; 1190 assert_matches!( 1191 db::make_transfer( 1192 &pool, 1193 &Transfer { 1194 request_uid: HashCode::rand(), 1195 amount: decimal("10"), 1196 exchange_base_url: url("https://exchange.test.com/"), 1197 metadata: None, 1198 wtid: ShortHashCode::rand(), 1199 creditor_id: 31000163100000000, 1200 creditor_name: "Name".into() 1201 }, 1202 &now 1203 ) 1204 .await 1205 .unwrap(), 1206 TransferResult::Success { .. } 1207 ); 1208 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id) 1209 .await 1210 .expect("status success"); 1211 1212 // Insert 1213 assert_eq!( 1214 db::register_tx_out(&mut db, &tx, first, &now) 1215 .await 1216 .expect("register tx out"), 1217 AddOutgoingResult { 1218 result: db::RegisterResult::known, 1219 row_id: transfer_id, 1220 } 1221 ); 1222 // Idempotent 1223 assert_eq!( 1224 db::register_tx_out( 1225 &mut db, 1226 &TxOut { 1227 valued_at: later, 1228 ..tx.clone() 1229 }, 1230 first, 1231 &now 1232 ) 1233 .await 1234 .expect("register tx out"), 1235 AddOutgoingResult { 1236 result: db::RegisterResult::idempotent, 1237 row_id: transfer_id, 1238 } 1239 ); 1240 // Recovered 1241 assert_eq!( 1242 db::register_tx_out( 1243 &mut db, 1244 &TxOut { 1245 transfer_id: transfer_id + 1, 1246 tx_id: Some(transfer_id + 1), 1247 valued_at: later, 1248 ..tx.clone() 1249 }, 1250 second, 1251 &now 1252 ) 1253 .await 1254 .expect("register tx out"), 1255 AddOutgoingResult { 1256 result: db::RegisterResult::recovered, 1257 row_id: transfer_id + 1, 1258 } 1259 ); 1260 }; 1261 1262 // Empty db 1263 assert_eq!( 1264 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1265 .await 1266 .unwrap(), 1267 Vec::new() 1268 ); 1269 1270 // Regular transaction 1271 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1272 1273 // Talerable transaction 1274 routine( 1275 &TxOutKind::Talerable(OutgoingSubject::rand()), 1276 &TxOutKind::Talerable(OutgoingSubject::rand()), 1277 ) 1278 .await; 1279 1280 // Bounced transaction 1281 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1282 1283 // History 1284 assert_eq!( 1285 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1286 .await 1287 .unwrap() 1288 .len(), 1289 2 1290 ); 1291 } 1292 1293 // TODO tx out failure 1294 1295 #[tokio::test] 1296 async fn transfer() { 1297 let (_, pool) = setup().await; 1298 1299 // Empty db 1300 assert_eq!( 1301 db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(), 1302 None 1303 ); 1304 assert_eq!( 1305 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1306 .await 1307 .unwrap(), 1308 Vec::new() 1309 ); 1310 1311 let req = Transfer { 1312 request_uid: HashCode::rand(), 1313 amount: decimal("10"), 1314 exchange_base_url: url("https://exchange.test.com/"), 1315 metadata: None, 1316 wtid: ShortHashCode::rand(), 1317 creditor_id: 31000163100000000, 1318 creditor_name: "Name".into(), 1319 }; 1320 let now = now_sql_stable_ts(); 1321 let later = now + Span::new().hours(2); 1322 // Insert 1323 assert_eq!( 1324 db::make_transfer(&pool, &req, &now) 1325 .await 1326 .expect("transfer"), 1327 TransferResult::Success { 1328 id: 1, 1329 initiated_at: now 1330 } 1331 ); 1332 // Idempotent 1333 assert_eq!( 1334 db::make_transfer(&pool, &req, &later) 1335 .await 1336 .expect("transfer"), 1337 TransferResult::Success { 1338 id: 1, 1339 initiated_at: now 1340 } 1341 ); 1342 // Request UID reuse 1343 assert_eq!( 1344 db::make_transfer( 1345 &pool, 1346 &Transfer { 1347 wtid: ShortHashCode::rand(), 1348 ..req.clone() 1349 }, 1350 &now 1351 ) 1352 .await 1353 .expect("transfer"), 1354 TransferResult::RequestUidReuse 1355 ); 1356 // wtid reuse 1357 assert_eq!( 1358 db::make_transfer( 1359 &pool, 1360 &Transfer { 1361 request_uid: HashCode::rand(), 1362 ..req.clone() 1363 }, 1364 &now 1365 ) 1366 .await 1367 .expect("transfer"), 1368 TransferResult::WtidReuse 1369 ); 1370 // Many 1371 assert_eq!( 1372 db::make_transfer( 1373 &pool, 1374 &Transfer { 1375 request_uid: HashCode::rand(), 1376 wtid: ShortHashCode::rand(), 1377 ..req 1378 }, 1379 &later 1380 ) 1381 .await 1382 .expect("transfer"), 1383 TransferResult::Success { 1384 id: 2, 1385 initiated_at: later 1386 } 1387 ); 1388 1389 // Get 1390 assert!( 1391 db::transfer_by_id(&pool, 1, &CURR, &ROOT) 1392 .await 1393 .unwrap() 1394 .is_some() 1395 ); 1396 assert!( 1397 db::transfer_by_id(&pool, 2, &CURR, &ROOT) 1398 .await 1399 .unwrap() 1400 .is_some() 1401 ); 1402 assert!( 1403 db::transfer_by_id(&pool, 3, &CURR, &ROOT) 1404 .await 1405 .unwrap() 1406 .is_none() 1407 ); 1408 assert_eq!( 1409 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1410 .await 1411 .unwrap() 1412 .len(), 1413 2 1414 ); 1415 } 1416 1417 #[tokio::test] 1418 async fn bounce() { 1419 let (mut db, _) = setup().await; 1420 1421 let amount = decimal("10"); 1422 let now = now_sql_stable_ts(); 1423 1424 // Bounce 1425 assert_eq!( 1426 db::register_bounced_tx_in( 1427 &mut db, 1428 &TxIn { 1429 transfer_id: 12, 1430 tx_id: None, 1431 amount, 1432 subject: "subject".to_owned(), 1433 debtor_id: 31000163100000000, 1434 debtor_name: "Name".into(), 1435 valued_at: now 1436 }, 1437 22, 1438 "good reason", 1439 &now 1440 ) 1441 .await 1442 .expect("bounce"), 1443 BounceResult { 1444 tx_id: 1, 1445 tx_new: true 1446 } 1447 ); 1448 // Idempotent 1449 assert_eq!( 1450 db::register_bounced_tx_in( 1451 &mut db, 1452 &TxIn { 1453 transfer_id: 12, 1454 tx_id: None, 1455 amount, 1456 subject: "subject".to_owned(), 1457 debtor_id: 31000163100000000, 1458 debtor_name: "Name".into(), 1459 valued_at: now 1460 }, 1461 22, 1462 "good reason", 1463 &now 1464 ) 1465 .await 1466 .expect("bounce"), 1467 BounceResult { 1468 tx_id: 1, 1469 tx_new: false 1470 } 1471 ); 1472 1473 // Many 1474 assert_eq!( 1475 db::register_bounced_tx_in( 1476 &mut db, 1477 &TxIn { 1478 transfer_id: 13, 1479 tx_id: None, 1480 amount, 1481 subject: "subject".to_owned(), 1482 debtor_id: 31000163100000000, 1483 debtor_name: "Name".into(), 1484 valued_at: now 1485 }, 1486 23, 1487 "good reason", 1488 &now 1489 ) 1490 .await 1491 .expect("bounce"), 1492 BounceResult { 1493 tx_id: 2, 1494 tx_new: true 1495 } 1496 ); 1497 } 1498 1499 #[tokio::test] 1500 async fn status() { 1501 let (mut db, pool) = setup().await; 1502 1503 let check_status = async |id: u64, status: TransferState, msg: Option<&str>| { 1504 let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT) 1505 .await 1506 .unwrap() 1507 .unwrap(); 1508 assert_eq!( 1509 (status, msg), 1510 (transfer.status, transfer.status_msg.as_deref()) 1511 ); 1512 }; 1513 1514 // Unknown transfer 1515 db::initiated_submit_permanent_failure(&mut db, 1, "msg") 1516 .await 1517 .unwrap(); 1518 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1519 .await 1520 .unwrap(); 1521 assert_eq!( 1522 db::initiated_chargeback_failure(&mut db, 1).await.unwrap(), 1523 ChargebackFailureResult::Unknown 1524 ); 1525 1526 // Failure 1527 db::make_transfer( 1528 &pool, 1529 &Transfer { 1530 request_uid: HashCode::rand(), 1531 amount: decimal("1"), 1532 exchange_base_url: url("https://exchange.test.com/"), 1533 metadata: None, 1534 wtid: ShortHashCode::rand(), 1535 creditor_id: 31000163100000000, 1536 creditor_name: "Name".into(), 1537 }, 1538 &Timestamp::now(), 1539 ) 1540 .await 1541 .expect("transfer"); 1542 check_status(1, TransferState::pending, None).await; 1543 db::initiated_submit_permanent_failure(&mut db, 1, "error status") 1544 .await 1545 .unwrap(); 1546 check_status(1, TransferState::permanent_failure, Some("error status")).await; 1547 1548 // Success 1549 db::make_transfer( 1550 &pool, 1551 &Transfer { 1552 request_uid: HashCode::rand(), 1553 amount: decimal("1"), 1554 exchange_base_url: url("https://exchange.test.com/"), 1555 metadata: None, 1556 wtid: ShortHashCode::rand(), 1557 creditor_id: 31000163100000000, 1558 creditor_name: "Name".into(), 1559 }, 1560 &Timestamp::now(), 1561 ) 1562 .await 1563 .expect("transfer"); 1564 check_status(2, TransferState::pending, None).await; 1565 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1566 .await 1567 .unwrap(); 1568 check_status(2, TransferState::pending, None).await; 1569 db::register_tx_out( 1570 &mut db, 1571 &TxOut { 1572 transfer_id: 5, 1573 tx_id: Some(3), 1574 amount: decimal("2"), 1575 subject: "".to_string(), 1576 creditor_id: 31000163100000000, 1577 creditor_name: "Name".into(), 1578 valued_at: Timestamp::now(), 1579 }, 1580 &TxOutKind::Simple, 1581 &Timestamp::now(), 1582 ) 1583 .await 1584 .unwrap(); 1585 check_status(2, TransferState::success, None).await; 1586 1587 // Chargeback 1588 assert_eq!( 1589 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1590 ChargebackFailureResult::Known(2) 1591 ); 1592 check_status(2, TransferState::late_failure, Some("charged back")).await; 1593 assert_eq!( 1594 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1595 ChargebackFailureResult::Idempotent(2) 1596 ); 1597 } 1598 1599 #[tokio::test] 1600 async fn batch() { 1601 let (mut db, pool) = setup().await; 1602 let start = Timestamp::now(); 1603 1604 // Empty db 1605 let pendings = db::pending_batch(&mut db, &start) 1606 .await 1607 .expect("pending_batch"); 1608 assert_eq!(pendings.len(), 0); 1609 1610 // Some transfers 1611 for i in 0..3 { 1612 db::make_transfer( 1613 &pool, 1614 &Transfer { 1615 request_uid: HashCode::rand(), 1616 amount: decimal(format!("{}", i + 1)), 1617 exchange_base_url: url("https://exchange.test.com/"), 1618 metadata: None, 1619 wtid: ShortHashCode::rand(), 1620 creditor_id: 31000163100000000, 1621 creditor_name: "Name".into(), 1622 }, 1623 &Timestamp::now(), 1624 ) 1625 .await 1626 .expect("transfer"); 1627 } 1628 let pendings = db::pending_batch(&mut db, &start) 1629 .await 1630 .expect("pending_batch"); 1631 assert_eq!(pendings.len(), 3); 1632 1633 // Max 100 txs in batch 1634 for i in 0..100 { 1635 db::make_transfer( 1636 &pool, 1637 &Transfer { 1638 request_uid: HashCode::rand(), 1639 amount: decimal(format!("{}", i + 1)), 1640 exchange_base_url: url("https://exchange.test.com/"), 1641 metadata: None, 1642 wtid: ShortHashCode::rand(), 1643 creditor_id: 31000163100000000, 1644 creditor_name: "Name".into(), 1645 }, 1646 &Timestamp::now(), 1647 ) 1648 .await 1649 .expect("transfer"); 1650 } 1651 let pendings = db::pending_batch(&mut db, &start) 1652 .await 1653 .expect("pending_batch"); 1654 assert_eq!(pendings.len(), 100); 1655 1656 // Skip uploaded 1657 for i in 0..=10 { 1658 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1659 .await 1660 .expect("status success"); 1661 } 1662 let pendings = db::pending_batch(&mut db, &start) 1663 .await 1664 .expect("pending_batch"); 1665 assert_eq!(pendings.len(), 93); 1666 1667 // Skip failed 1668 for i in 0..=10 { 1669 db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") 1670 .await 1671 .expect("status failure"); 1672 } 1673 let pendings = db::pending_batch(&mut db, &start) 1674 .await 1675 .expect("pending_batch"); 1676 assert_eq!(pendings.len(), 83); 1677 } 1678 }