db.rs (49129B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::Timestamp; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api::{ 30 HashCode, ShortHashCode, 31 params::{History, Page}, 32 prepared::{RegistrationRequest, Unregistration}, 33 revenue::RevenueIncomingBankTransaction, 34 wire::{ 35 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 36 TransferStatus, 37 }, 38 }, 39 config::Config, 40 db::IncomingType, 41 types::{ 42 amount::{Currency, Decimal}, 43 payto::{PaytoImpl as _, PaytoURI}, 44 }, 45 }; 46 use tokio::sync::watch::{Receiver, Sender}; 47 use url::Url; 48 49 use crate::{ 50 config::parse_db_cfg, 51 payto::{CyclosAccount, CyclosId, FullCyclosPayto}, 52 }; 53 54 const SCHEMA: &str = "cyclos"; 55 56 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 57 let db = parse_db_cfg(cfg)?; 58 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 59 Ok(pool) 60 } 61 62 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 63 let db_cfg = parse_db_cfg(cfg)?; 64 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 65 let mut db = pool.acquire().await?; 66 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; 67 Ok(pool) 68 } 69 70 pub async fn notification_listener( 71 pool: PgPool, 72 in_channel: Sender<i64>, 73 taler_in_channel: Sender<i64>, 74 out_channel: Sender<i64>, 75 taler_out_channel: Sender<i64>, 76 ) { 77 taler_api::notification::notification_listener!(&pool, 78 "tx_in" => (row_id: i64) { 79 in_channel.send_replace(row_id); 80 }, 81 "taler_in" => (row_id: i64) { 82 taler_in_channel.send_replace(row_id); 83 }, 84 "tx_out" => (row_id: i64) { 85 out_channel.send_replace(row_id); 86 }, 87 "taler_out" => (row_id: i64) { 88 taler_out_channel.send_replace(row_id); 89 } 90 ) 91 } 92 93 #[derive(Debug, Clone)] 94 pub struct TxIn { 95 pub transfer_id: i64, 96 pub tx_id: Option<i64>, 97 pub amount: Decimal, 98 pub subject: String, 99 pub debtor_id: i64, 100 pub debtor_name: CompactString, 101 pub valued_at: Timestamp, 102 } 103 104 impl Display for TxIn { 105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 106 let Self { 107 transfer_id, 108 tx_id, 109 amount, 110 subject, 111 valued_at, 112 debtor_id, 113 debtor_name, 114 } = self; 115 let tx_id = match tx_id { 116 Some(id) => format_args!(":{}", *id), 117 None => format_args!(""), 118 }; 119 write!( 120 f, 121 "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'" 122 ) 123 } 124 } 125 126 #[derive(Debug, Clone)] 127 pub struct TxOut { 128 pub transfer_id: i64, 129 pub tx_id: Option<i64>, 130 pub amount: Decimal, 131 pub subject: String, 132 pub creditor_id: i64, 133 pub creditor_name: CompactString, 134 pub valued_at: Timestamp, 135 } 136 137 impl Display for TxOut { 138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 139 let Self { 140 transfer_id, 141 tx_id, 142 amount, 143 subject, 144 creditor_id, 145 creditor_name, 146 valued_at, 147 } = self; 148 let tx_id = match tx_id { 149 Some(id) => format_args!(":{}", *id), 150 None => format_args!(""), 151 }; 152 write!( 153 f, 154 "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 155 ) 156 } 157 } 158 159 #[derive(Debug, PartialEq, Eq)] 160 pub struct Initiated { 161 pub id: i64, 162 pub amount: Decimal, 163 pub subject: String, 164 pub creditor_id: i64, 165 pub creditor_name: CompactString, 166 } 167 168 impl Display for Initiated { 169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 170 let Self { 171 id, 172 amount, 173 subject, 174 creditor_id, 175 creditor_name, 176 } = self; 177 write!( 178 f, 179 "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 180 ) 181 } 182 } 183 184 #[derive(Debug, Clone)] 185 pub struct TxInAdmin { 186 pub amount: Decimal, 187 pub subject: String, 188 pub debtor_id: i64, 189 pub debtor_name: CompactString, 190 pub metadata: IncomingSubject, 191 } 192 193 #[derive(Debug, PartialEq, Eq)] 194 pub enum AddIncomingResult { 195 Success { 196 new: bool, 197 pending: bool, 198 row_id: u64, 199 valued_at: Timestamp, 200 }, 201 ReservePubReuse, 202 UnknownMapping, 203 MappingReuse, 204 } 205 206 /// Lock the database for worker execution 207 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 208 sqlx::query("SELECT pg_try_advisory_lock(42)") 209 .try_map(|r: PgRow| r.try_get(0)) 210 .fetch_one(e) 211 .await 212 } 213 214 pub async fn register_tx_in_admin( 215 db: &PgPool, 216 tx: &TxInAdmin, 217 now: &Timestamp, 218 ) -> sqlx::Result<AddIncomingResult> { 219 serialized!( 220 sqlx::query( 221 " 222 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 223 FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) 224 ", 225 ) 226 .bind(tx.amount) 227 .bind(&tx.subject) 228 .bind(tx.debtor_id) 229 .bind(&tx.debtor_name) 230 .bind_timestamp(now) 231 .bind(tx.metadata.ty()) 232 .bind(tx.metadata.key()) 233 .try_map(|r: PgRow| { 234 Ok(if r.try_get_flag(0)? { 235 AddIncomingResult::ReservePubReuse 236 } else if r.try_get_flag(1)? { 237 AddIncomingResult::MappingReuse 238 } else if r.try_get_flag(2)? { 239 AddIncomingResult::UnknownMapping 240 } else { 241 AddIncomingResult::Success { 242 row_id: r.try_get_u64(3)?, 243 valued_at: r.try_get_timestamp(4)?, 244 new: r.try_get(5)?, 245 pending: r.try_get(6)? 246 } 247 }) 248 }) 249 .fetch_one(db) 250 ) 251 } 252 253 pub async fn register_tx_in( 254 db: &mut PgConnection, 255 tx: &TxIn, 256 subject: &Option<IncomingSubject>, 257 now: &Timestamp, 258 ) -> sqlx::Result<AddIncomingResult> { 259 serialized!( 260 sqlx::query( 261 " 262 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 263 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 264 ", 265 ) 266 .bind(tx.transfer_id) 267 .bind(tx.tx_id) 268 .bind(tx.amount) 269 .bind(&tx.subject) 270 .bind(tx.debtor_id) 271 .bind(&tx.debtor_name) 272 .bind(tx.valued_at.as_microsecond()) 273 .bind(subject.as_ref().map(|it| it.ty())) 274 .bind(subject.as_ref().map(|it| it.key())) 275 .bind(now.as_microsecond()) 276 .try_map(|r: PgRow| { 277 Ok(if r.try_get_flag(0)? { 278 AddIncomingResult::ReservePubReuse 279 } else if r.try_get_flag(1)? { 280 AddIncomingResult::MappingReuse 281 } else if r.try_get_flag(2)? { 282 AddIncomingResult::UnknownMapping 283 } else { 284 AddIncomingResult::Success { 285 row_id: r.try_get_u64(3)?, 286 valued_at: r.try_get_timestamp(4)?, 287 new: r.try_get(5)?, 288 pending: r.try_get(6)? 289 } 290 }) 291 }) 292 .fetch_one(&mut *db) 293 ) 294 } 295 296 #[derive(Debug)] 297 pub enum TxOutKind { 298 Simple, 299 Bounce(i64), 300 Talerable(OutgoingSubject), 301 } 302 303 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 304 #[allow(non_camel_case_types)] 305 #[sqlx(type_name = "register_result")] 306 pub enum RegisterResult { 307 /// Already registered 308 idempotent, 309 /// Initiated transaction 310 known, 311 /// Recovered unknown outgoing transaction 312 recovered, 313 } 314 315 #[derive(Debug, PartialEq, Eq)] 316 pub struct AddOutgoingResult { 317 pub result: RegisterResult, 318 pub row_id: i64, 319 } 320 321 pub async fn register_tx_out( 322 db: &mut PgConnection, 323 tx: &TxOut, 324 kind: &TxOutKind, 325 now: &Timestamp, 326 ) -> sqlx::Result<AddOutgoingResult> { 327 serialized!({ 328 let query = sqlx::query( 329 " 330 SELECT out_result, out_tx_row_id 331 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 332 ", 333 ) 334 .bind(tx.transfer_id) 335 .bind(tx.tx_id) 336 .bind(tx.amount) 337 .bind(&tx.subject) 338 .bind(tx.creditor_id) 339 .bind(&tx.creditor_name) 340 .bind_timestamp(&tx.valued_at); 341 let query = match kind { 342 TxOutKind::Simple => query 343 .bind(None::<&[u8]>) 344 .bind(None::<&str>) 345 .bind(None::<&str>) 346 .bind(None::<i64>), 347 TxOutKind::Bounce(bounced) => query 348 .bind(None::<&[u8]>) 349 .bind(None::<&str>) 350 .bind(None::<&str>) 351 .bind(*bounced), 352 TxOutKind::Talerable(subject) => query 353 .bind(&subject.wtid) 354 .bind(subject.exchange_base_url.as_str()) 355 .bind(&subject.metadata) 356 .bind(None::<i64>), 357 }; 358 query 359 .bind_timestamp(now) 360 .try_map(|r: PgRow| { 361 Ok(AddOutgoingResult { 362 result: r.try_get(0)?, 363 row_id: r.try_get(1)?, 364 }) 365 }) 366 .fetch_one(&mut *db) 367 }) 368 } 369 370 #[derive(Debug, PartialEq, Eq)] 371 pub enum TransferResult { 372 Success { id: u64, initiated_at: Timestamp }, 373 RequestUidReuse, 374 WtidReuse, 375 } 376 377 #[derive(Debug, Clone)] 378 pub struct Transfer { 379 pub request_uid: HashCode, 380 pub amount: Decimal, 381 pub exchange_base_url: Url, 382 pub metadata: Option<CompactString>, 383 pub wtid: ShortHashCode, 384 pub creditor_id: i64, 385 pub creditor_name: CompactString, 386 } 387 388 pub async fn make_transfer( 389 db: &PgPool, 390 tx: &Transfer, 391 now: &Timestamp, 392 ) -> sqlx::Result<TransferResult> { 393 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 394 serialized!( 395 sqlx::query( 396 " 397 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 398 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 399 ", 400 ) 401 .bind(&tx.request_uid) 402 .bind(&tx.wtid) 403 .bind(&subject) 404 .bind(tx.amount) 405 .bind(tx.exchange_base_url.as_str()) 406 .bind(&tx.metadata) 407 .bind(tx.creditor_id) 408 .bind(&tx.creditor_name) 409 .bind_timestamp(now) 410 .try_map(|r: PgRow| { 411 Ok(if r.try_get_flag(0)? { 412 TransferResult::RequestUidReuse 413 } else if r.try_get_flag(1)? { 414 TransferResult::WtidReuse 415 } else { 416 TransferResult::Success { 417 id: r.try_get_u64(2)?, 418 initiated_at: r.try_get_timestamp(3)?, 419 } 420 }) 421 }) 422 .fetch_one(db) 423 ) 424 } 425 426 #[derive(Debug, PartialEq, Eq)] 427 pub struct BounceResult { 428 pub tx_id: u64, 429 pub tx_new: bool, 430 } 431 432 pub async fn register_bounced_tx_in( 433 db: &mut PgConnection, 434 tx: &TxIn, 435 chargeback_id: i64, 436 reason: &str, 437 now: &Timestamp, 438 ) -> sqlx::Result<BounceResult> { 439 serialized!( 440 sqlx::query( 441 " 442 SELECT out_tx_row_id, out_tx_new 443 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 444 ", 445 ) 446 .bind(tx.transfer_id) 447 .bind(tx.tx_id) 448 .bind(tx.amount) 449 .bind(&tx.subject) 450 .bind(tx.debtor_id) 451 .bind(&tx.debtor_name) 452 .bind_timestamp(&tx.valued_at) 453 .bind(chargeback_id) 454 .bind(reason) 455 .bind_timestamp(now) 456 .try_map(|r: PgRow| { 457 Ok(BounceResult { 458 tx_id: r.try_get_u64(0)?, 459 tx_new: r.try_get(1)?, 460 }) 461 }) 462 .fetch_one(&mut *db) 463 ) 464 } 465 466 pub async fn transfer_page( 467 db: &PgPool, 468 status: &Option<TransferState>, 469 currency: &Currency, 470 root: &CompactString, 471 params: &Page, 472 ) -> sqlx::Result<Vec<TransferListStatus>> { 473 page( 474 db, 475 params, 476 "initiated_id", 477 || { 478 let mut builder = QueryBuilder::new( 479 " 480 SELECT 481 initiated_id, 482 status, 483 amount, 484 credit_account, 485 credit_name, 486 initiated_at 487 FROM transfer 488 JOIN initiated USING (initiated_id) 489 WHERE 490 ", 491 ); 492 if let Some(status) = status { 493 builder.push(" status = ").push_bind(status).push(" AND "); 494 } 495 builder 496 }, 497 |r: PgRow| { 498 Ok(TransferListStatus { 499 row_id: r.try_get_u64(0)?, 500 status: r.try_get(1)?, 501 amount: r.try_get_amount(2, currency)?, 502 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 503 timestamp: r.try_get_timestamp(5)?.into(), 504 }) 505 }, 506 ) 507 .await 508 } 509 510 pub async fn outgoing_history( 511 db: &PgPool, 512 params: &History, 513 currency: &Currency, 514 root: &CompactString, 515 listen: impl FnOnce() -> Receiver<i64>, 516 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 517 history( 518 db, 519 "tx_out_id", 520 params, 521 listen, 522 || { 523 QueryBuilder::new( 524 " 525 SELECT 526 tx_out_id, 527 amount, 528 credit_account, 529 credit_name, 530 valued_at, 531 exchange_base_url, 532 metadata, 533 wtid 534 FROM taler_out 535 JOIN tx_out USING (tx_out_id) 536 WHERE 537 ", 538 ) 539 }, 540 |r: PgRow| { 541 Ok(OutgoingBankTransaction { 542 row_id: r.try_get_u64(0)?, 543 amount: r.try_get_amount(1, currency)?, 544 debit_fee: None, 545 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, 546 date: r.try_get_timestamp(4)?.into(), 547 exchange_base_url: r.try_get_url(5)?, 548 metadata: r.try_get(6)?, 549 wtid: r.try_get(7)?, 550 }) 551 }, 552 ) 553 .await 554 } 555 556 pub async fn incoming_history( 557 db: &PgPool, 558 params: &History, 559 currency: &Currency, 560 root: &CompactString, 561 listen: impl FnOnce() -> Receiver<i64>, 562 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 563 history( 564 db, 565 "tx_in_id", 566 params, 567 listen, 568 || { 569 QueryBuilder::new( 570 " 571 SELECT 572 type, 573 tx_in_id, 574 amount, 575 debit_account, 576 debit_name, 577 valued_at, 578 metadata, 579 authorization_pub, 580 authorization_sig 581 FROM taler_in 582 JOIN tx_in USING (tx_in_id) 583 WHERE 584 ", 585 ) 586 }, 587 |r: PgRow| { 588 Ok(match r.try_get(0)? { 589 IncomingType::reserve => IncomingBankTransaction::Reserve { 590 row_id: r.try_get_u64(1)?, 591 amount: r.try_get_amount(2, currency)?, 592 credit_fee: None, 593 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 594 date: r.try_get_timestamp(5)?.into(), 595 reserve_pub: r.try_get(6)?, 596 authorization_pub: r.try_get(7)?, 597 authorization_sig: r.try_get(8)?, 598 }, 599 IncomingType::kyc => IncomingBankTransaction::Kyc { 600 row_id: r.try_get_u64(1)?, 601 amount: r.try_get_amount(2, currency)?, 602 credit_fee: None, 603 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 604 date: r.try_get_timestamp(5)?.into(), 605 account_pub: r.try_get(6)?, 606 authorization_pub: r.try_get(7)?, 607 authorization_sig: r.try_get(8)?, 608 }, 609 IncomingType::map => unimplemented!("MAP are never listed in the history"), 610 }) 611 }, 612 ) 613 .await 614 } 615 616 pub async fn revenue_history( 617 db: &PgPool, 618 params: &History, 619 currency: &Currency, 620 root: &CompactString, 621 listen: impl FnOnce() -> Receiver<i64>, 622 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 623 history( 624 db, 625 "tx_in_id", 626 params, 627 listen, 628 || { 629 QueryBuilder::new( 630 " 631 SELECT 632 tx_in_id, 633 valued_at, 634 amount, 635 debit_account, 636 debit_name, 637 subject 638 FROM tx_in 639 WHERE 640 ", 641 ) 642 }, 643 |r: PgRow| { 644 Ok(RevenueIncomingBankTransaction { 645 row_id: r.try_get_u64(0)?, 646 date: r.try_get_timestamp(1)?.into(), 647 amount: r.try_get_amount(2, currency)?, 648 credit_fee: None, 649 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 650 subject: r.try_get(5)?, 651 }) 652 }, 653 ) 654 .await 655 } 656 657 pub async fn transfer_by_id( 658 db: &PgPool, 659 id: u64, 660 currency: &Currency, 661 root: &CompactString, 662 ) -> sqlx::Result<Option<TransferStatus>> { 663 serialized!( 664 sqlx::query( 665 " 666 SELECT 667 status, 668 status_msg, 669 amount, 670 exchange_base_url, 671 metadata, 672 wtid, 673 credit_account, 674 credit_name, 675 initiated_at 676 FROM transfer 677 JOIN initiated USING (initiated_id) 678 WHERE initiated_id = $1 679 ", 680 ) 681 .bind(id as i64) 682 .try_map(|r: PgRow| { 683 Ok(TransferStatus { 684 status: r.try_get(0)?, 685 status_msg: r.try_get(1)?, 686 amount: r.try_get_amount(2, currency)?, 687 origin_exchange_url: r.try_get(3)?, 688 metadata: r.try_get(4)?, 689 wtid: r.try_get(5)?, 690 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?, 691 timestamp: r.try_get_timestamp(8)?.into(), 692 }) 693 }) 694 .fetch_optional(db) 695 ) 696 } 697 698 /** Get a batch of pending initiated transactions not attempted since [start] */ 699 pub async fn pending_batch( 700 db: &mut PgConnection, 701 start: &Timestamp, 702 ) -> sqlx::Result<Vec<Initiated>> { 703 serialized!( 704 sqlx::query( 705 " 706 SELECT initiated_id, amount, subject, credit_account, credit_name 707 FROM initiated 708 WHERE tx_id IS NULL 709 AND status='pending' 710 AND (last_submitted IS NULL OR last_submitted < $1) 711 LIMIT 100 712 ", 713 ) 714 .bind_timestamp(start) 715 .try_map(|r: PgRow| { 716 Ok(Initiated { 717 id: r.try_get(0)?, 718 amount: r.try_get(1)?, 719 subject: r.try_get(2)?, 720 creditor_id: r.try_get(3)?, 721 creditor_name: r.try_get(4)?, 722 }) 723 }) 724 .fetch_all(&mut *db) 725 ) 726 } 727 728 /** Update status of a successful submitted initiated transaction */ 729 pub async fn initiated_submit_success( 730 db: &mut PgConnection, 731 initiated_id: i64, 732 timestamp: &Timestamp, 733 tx_id: i64, 734 ) -> sqlx::Result<()> { 735 serialized!( 736 sqlx::query( 737 " 738 UPDATE initiated 739 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 740 WHERE initiated_id=$3 741 " 742 ) 743 .bind_timestamp(timestamp) 744 .bind(tx_id) 745 .bind(initiated_id) 746 .execute(&mut *db) 747 )?; 748 Ok(()) 749 } 750 751 /** Update status of a permanently failed initiated transaction */ 752 pub async fn initiated_submit_permanent_failure( 753 db: &mut PgConnection, 754 initiated_id: i64, 755 msg: &str, 756 ) -> sqlx::Result<()> { 757 serialized!( 758 sqlx::query( 759 " 760 UPDATE initiated 761 SET status='permanent_failure', status_msg=$1 762 WHERE initiated_id=$2 763 ", 764 ) 765 .bind(msg) 766 .bind(initiated_id) 767 .execute(&mut *db) 768 )?; 769 Ok(()) 770 } 771 772 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 773 pub enum ChargebackFailureResult { 774 Unknown, 775 Known(u64), 776 Idempotent(u64), 777 } 778 779 /** Update status of a charged back initiated transaction */ 780 pub async fn initiated_chargeback_failure( 781 db: &mut PgConnection, 782 transfer_id: i64, 783 ) -> sqlx::Result<ChargebackFailureResult> { 784 Ok(serialized!( 785 sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") 786 .bind(transfer_id) 787 .try_map(|r: PgRow| { 788 let id = r.try_get_u64(0)?; 789 Ok(if id == 0 { 790 ChargebackFailureResult::Unknown 791 } else if r.try_get(1)? { 792 ChargebackFailureResult::Known(id) 793 } else { 794 ChargebackFailureResult::Idempotent(id) 795 }) 796 }) 797 .fetch_optional(&mut *db) 798 )? 799 .unwrap_or(ChargebackFailureResult::Unknown)) 800 } 801 802 /** Get JSON value from KV table */ 803 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 804 db: &mut PgConnection, 805 key: &str, 806 ) -> sqlx::Result<Option<T>> { 807 serialized!( 808 sqlx::query("SELECT value FROM kv WHERE key=$1") 809 .bind(key) 810 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 811 .fetch_optional(&mut *db) 812 ) 813 } 814 815 /** Set JSON value in KV table */ 816 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 817 serialized!( 818 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 819 .bind(key) 820 .bind(sqlx::types::Json(value)) 821 .execute(&mut *db) 822 )?; 823 Ok(()) 824 } 825 826 pub enum RegistrationResult { 827 Success, 828 ReservePubReuse, 829 } 830 831 pub async fn transfer_register( 832 db: &PgPool, 833 req: &RegistrationRequest, 834 ) -> sqlx::Result<RegistrationResult> { 835 let ty: IncomingType = req.r#type.into(); 836 serialized!( 837 sqlx::query( 838 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 839 ) 840 .bind(ty) 841 .bind(&req.account_pub) 842 .bind(&req.authorization_pub) 843 .bind(&req.authorization_sig) 844 .bind(req.recurrent) 845 .bind_timestamp(&Timestamp::now()) 846 .try_map(|r: PgRow| { 847 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 848 RegistrationResult::ReservePubReuse 849 } else { 850 RegistrationResult::Success 851 }) 852 }) 853 .fetch_one(db) 854 ) 855 } 856 857 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 858 serialized!( 859 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)") 860 .bind(&req.authorization_pub) 861 .try_map(|r: PgRow| r.try_get_flag("out_found")) 862 .fetch_one(db) 863 ) 864 } 865 866 pub trait CyclosTypeHelper { 867 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 868 &self, 869 idx: I, 870 name: I, 871 root: &CompactString, 872 ) -> sqlx::Result<FullCyclosPayto>; 873 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 874 &self, 875 idx: I, 876 name: I, 877 root: &CompactString, 878 ) -> sqlx::Result<PaytoURI>; 879 } 880 881 impl CyclosTypeHelper for PgRow { 882 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 883 &self, 884 idx: I, 885 name: I, 886 root: &CompactString, 887 ) -> sqlx::Result<FullCyclosPayto> { 888 let idx = self.try_get(idx)?; 889 let name = self.try_get(name)?; 890 Ok(FullCyclosPayto::new( 891 CyclosAccount { 892 id: CyclosId(idx), 893 root: root.clone(), 894 }, 895 name, 896 )) 897 } 898 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 899 &self, 900 idx: I, 901 name: I, 902 root: &CompactString, 903 ) -> sqlx::Result<PaytoURI> { 904 let idx = self.try_get(idx)?; 905 let name = self.try_get(name)?; 906 Ok(CyclosAccount { 907 id: CyclosId(idx), 908 root: root.clone(), 909 } 910 .as_full_uri(name)) 911 } 912 } 913 914 #[cfg(test)] 915 mod test { 916 use std::assert_matches; 917 918 use compact_str::CompactString; 919 use jiff::{Span, Timestamp}; 920 use serde_json::json; 921 use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; 922 use taler_api::{ 923 db::TypeHelper, 924 notification::dummy_listen, 925 subject::{IncomingSubject, OutgoingSubject}, 926 }; 927 use taler_common::{ 928 api::{ 929 EddsaPublicKey, HashCode, ShortHashCode, 930 params::{History, Page}, 931 wire::TransferState, 932 }, 933 types::{ 934 amount::{Currency, decimal}, 935 url, 936 utils::now_sql_stable_ts, 937 }, 938 }; 939 940 use crate::{ 941 constants::CONFIG_SOURCE, 942 db::{ 943 self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult, 944 Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind, 945 }, 946 }; 947 948 pub const CURR: Currency = Currency::TEST; 949 pub const ROOT: CompactString = CompactString::const_new("localhost"); 950 951 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 952 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 953 } 954 955 #[tokio::test] 956 async fn kv() { 957 let (mut db, _) = setup().await; 958 959 let value = json!({ 960 "name": "Mr Smith", 961 "no way": 32 962 }); 963 964 assert_eq!( 965 db::kv_get::<serde_json::Value>(&mut db, "value") 966 .await 967 .unwrap(), 968 None 969 ); 970 db::kv_set(&mut db, "value", &value).await.unwrap(); 971 db::kv_set(&mut db, "value", &value).await.unwrap(); 972 assert_eq!( 973 db::kv_get::<serde_json::Value>(&mut db, "value") 974 .await 975 .unwrap(), 976 Some(value) 977 ); 978 } 979 980 #[tokio::test] 981 async fn tx_in() { 982 let (mut db, pool) = setup().await; 983 984 let mut routine = async |first: &Option<IncomingSubject>, 985 second: &Option<IncomingSubject>| { 986 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 987 .try_map(|r: PgRow| r.try_get_u64(0)) 988 .fetch_one(&mut *db) 989 .await 990 .unwrap(); 991 let now = now_sql_stable_ts(); 992 let later = now + Span::new().hours(2); 993 let tx = TxIn { 994 transfer_id: now.as_microsecond() as i64, 995 tx_id: None, 996 amount: decimal("10"), 997 subject: "subject".to_owned(), 998 debtor_id: 31000163100000000, 999 debtor_name: "Name".into(), 1000 valued_at: now, 1001 }; 1002 // Insert 1003 assert_eq!( 1004 db::register_tx_in(&mut db, &tx, first, &now) 1005 .await 1006 .expect("register tx in"), 1007 AddIncomingResult::Success { 1008 new: true, 1009 pending: false, 1010 row_id: id, 1011 valued_at: now, 1012 } 1013 ); 1014 // Idempotent 1015 assert_eq!( 1016 db::register_tx_in( 1017 &mut db, 1018 &TxIn { 1019 valued_at: later, 1020 ..tx.clone() 1021 }, 1022 first, 1023 &now 1024 ) 1025 .await 1026 .expect("register tx in"), 1027 AddIncomingResult::Success { 1028 new: false, 1029 pending: false, 1030 row_id: id, 1031 valued_at: now 1032 } 1033 ); 1034 // Many 1035 assert_eq!( 1036 db::register_tx_in( 1037 &mut db, 1038 &TxIn { 1039 transfer_id: later.as_microsecond() as i64, 1040 valued_at: later, 1041 ..tx 1042 }, 1043 second, 1044 &now 1045 ) 1046 .await 1047 .expect("register tx in"), 1048 AddIncomingResult::Success { 1049 new: true, 1050 pending: false, 1051 row_id: id + 1, 1052 valued_at: later 1053 } 1054 ); 1055 }; 1056 1057 // Empty db 1058 assert_eq!( 1059 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1060 .await 1061 .unwrap(), 1062 Vec::new() 1063 ); 1064 assert_eq!( 1065 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1066 .await 1067 .unwrap(), 1068 Vec::new() 1069 ); 1070 1071 // Regular transaction 1072 routine(&None, &None).await; 1073 1074 let first = EddsaPublicKey::rand(); 1075 let second = EddsaPublicKey::rand(); 1076 1077 // Reserve transaction 1078 routine( 1079 &Some(IncomingSubject::Reserve(first.clone())), 1080 &Some(IncomingSubject::Reserve(second)), 1081 ) 1082 .await; 1083 1084 // Kyc transaction 1085 routine( 1086 &Some(IncomingSubject::Kyc(first.clone())), 1087 &Some(IncomingSubject::Kyc(first)), 1088 ) 1089 .await; 1090 1091 // History 1092 assert_eq!( 1093 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1094 .await 1095 .unwrap() 1096 .len(), 1097 6 1098 ); 1099 assert_eq!( 1100 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1101 .await 1102 .unwrap() 1103 .len(), 1104 4 1105 ); 1106 } 1107 1108 #[tokio::test] 1109 async fn tx_in_admin() { 1110 let (_, pool) = setup().await; 1111 1112 // Empty db 1113 assert_eq!( 1114 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1115 .await 1116 .unwrap(), 1117 Vec::new() 1118 ); 1119 1120 let now = now_sql_stable_ts(); 1121 let later = now + Span::new().hours(2); 1122 let tx = TxInAdmin { 1123 amount: decimal("10"), 1124 subject: "subject".to_owned(), 1125 debtor_id: 31000163100000000, 1126 debtor_name: "Name".into(), 1127 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1128 }; 1129 // Insert 1130 assert_eq!( 1131 db::register_tx_in_admin(&pool, &tx, &now) 1132 .await 1133 .expect("register tx in"), 1134 AddIncomingResult::Success { 1135 new: true, 1136 pending: false, 1137 row_id: 1, 1138 valued_at: now 1139 } 1140 ); 1141 // Many 1142 assert_eq!( 1143 db::register_tx_in_admin( 1144 &pool, 1145 &TxInAdmin { 1146 subject: "Other".to_owned(), 1147 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1148 ..tx.clone() 1149 }, 1150 &later 1151 ) 1152 .await 1153 .expect("register tx in"), 1154 AddIncomingResult::Success { 1155 new: true, 1156 pending: false, 1157 row_id: 2, 1158 valued_at: later 1159 } 1160 ); 1161 1162 // History 1163 assert_eq!( 1164 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1165 .await 1166 .unwrap() 1167 .len(), 1168 2 1169 ); 1170 } 1171 1172 #[tokio::test] 1173 async fn tx_out() { 1174 let (mut db, pool) = setup().await; 1175 1176 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1177 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1178 .try_map(|r: PgRow| r.try_get(0)) 1179 .fetch_one(&mut *db) 1180 .await 1181 .unwrap(); 1182 let now = now_sql_stable_ts(); 1183 let later = now + Span::new().hours(2); 1184 let tx = TxOut { 1185 transfer_id, 1186 tx_id: Some(transfer_id), 1187 amount: decimal("10"), 1188 subject: "subject".to_owned(), 1189 creditor_id: 31000163100000000, 1190 creditor_name: "Name".into(), 1191 valued_at: now, 1192 }; 1193 assert_matches!( 1194 db::make_transfer( 1195 &pool, 1196 &Transfer { 1197 request_uid: HashCode::rand(), 1198 amount: decimal("10"), 1199 exchange_base_url: url("https://exchange.test.com/"), 1200 metadata: None, 1201 wtid: ShortHashCode::rand(), 1202 creditor_id: 31000163100000000, 1203 creditor_name: "Name".into() 1204 }, 1205 &now 1206 ) 1207 .await 1208 .unwrap(), 1209 TransferResult::Success { .. } 1210 ); 1211 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id) 1212 .await 1213 .expect("status success"); 1214 1215 // Insert 1216 assert_eq!( 1217 db::register_tx_out(&mut db, &tx, first, &now) 1218 .await 1219 .expect("register tx out"), 1220 AddOutgoingResult { 1221 result: db::RegisterResult::known, 1222 row_id: transfer_id, 1223 } 1224 ); 1225 // Idempotent 1226 assert_eq!( 1227 db::register_tx_out( 1228 &mut db, 1229 &TxOut { 1230 valued_at: later, 1231 ..tx.clone() 1232 }, 1233 first, 1234 &now 1235 ) 1236 .await 1237 .expect("register tx out"), 1238 AddOutgoingResult { 1239 result: db::RegisterResult::idempotent, 1240 row_id: transfer_id, 1241 } 1242 ); 1243 // Recovered 1244 assert_eq!( 1245 db::register_tx_out( 1246 &mut db, 1247 &TxOut { 1248 transfer_id: transfer_id + 1, 1249 tx_id: Some(transfer_id + 1), 1250 valued_at: later, 1251 ..tx.clone() 1252 }, 1253 second, 1254 &now 1255 ) 1256 .await 1257 .expect("register tx out"), 1258 AddOutgoingResult { 1259 result: db::RegisterResult::recovered, 1260 row_id: transfer_id + 1, 1261 } 1262 ); 1263 }; 1264 1265 // Empty db 1266 assert_eq!( 1267 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1268 .await 1269 .unwrap(), 1270 Vec::new() 1271 ); 1272 1273 // Regular transaction 1274 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1275 1276 // Talerable transaction 1277 routine( 1278 &TxOutKind::Talerable(OutgoingSubject::rand()), 1279 &TxOutKind::Talerable(OutgoingSubject::rand()), 1280 ) 1281 .await; 1282 1283 // Bounced transaction 1284 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1285 1286 // History 1287 assert_eq!( 1288 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1289 .await 1290 .unwrap() 1291 .len(), 1292 2 1293 ); 1294 } 1295 1296 // TODO tx out failure 1297 1298 #[tokio::test] 1299 async fn transfer() { 1300 let (_, pool) = setup().await; 1301 1302 // Empty db 1303 assert_eq!( 1304 db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(), 1305 None 1306 ); 1307 assert_eq!( 1308 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1309 .await 1310 .unwrap(), 1311 Vec::new() 1312 ); 1313 1314 let req = Transfer { 1315 request_uid: HashCode::rand(), 1316 amount: decimal("10"), 1317 exchange_base_url: url("https://exchange.test.com/"), 1318 metadata: None, 1319 wtid: ShortHashCode::rand(), 1320 creditor_id: 31000163100000000, 1321 creditor_name: "Name".into(), 1322 }; 1323 let now = now_sql_stable_ts(); 1324 let later = now + Span::new().hours(2); 1325 // Insert 1326 assert_eq!( 1327 db::make_transfer(&pool, &req, &now) 1328 .await 1329 .expect("transfer"), 1330 TransferResult::Success { 1331 id: 1, 1332 initiated_at: now 1333 } 1334 ); 1335 // Idempotent 1336 assert_eq!( 1337 db::make_transfer(&pool, &req, &later) 1338 .await 1339 .expect("transfer"), 1340 TransferResult::Success { 1341 id: 1, 1342 initiated_at: now 1343 } 1344 ); 1345 // Request UID reuse 1346 assert_eq!( 1347 db::make_transfer( 1348 &pool, 1349 &Transfer { 1350 wtid: ShortHashCode::rand(), 1351 ..req.clone() 1352 }, 1353 &now 1354 ) 1355 .await 1356 .expect("transfer"), 1357 TransferResult::RequestUidReuse 1358 ); 1359 // wtid reuse 1360 assert_eq!( 1361 db::make_transfer( 1362 &pool, 1363 &Transfer { 1364 request_uid: HashCode::rand(), 1365 ..req.clone() 1366 }, 1367 &now 1368 ) 1369 .await 1370 .expect("transfer"), 1371 TransferResult::WtidReuse 1372 ); 1373 // Many 1374 assert_eq!( 1375 db::make_transfer( 1376 &pool, 1377 &Transfer { 1378 request_uid: HashCode::rand(), 1379 wtid: ShortHashCode::rand(), 1380 ..req 1381 }, 1382 &later 1383 ) 1384 .await 1385 .expect("transfer"), 1386 TransferResult::Success { 1387 id: 2, 1388 initiated_at: later 1389 } 1390 ); 1391 1392 // Get 1393 assert!( 1394 db::transfer_by_id(&pool, 1, &CURR, &ROOT) 1395 .await 1396 .unwrap() 1397 .is_some() 1398 ); 1399 assert!( 1400 db::transfer_by_id(&pool, 2, &CURR, &ROOT) 1401 .await 1402 .unwrap() 1403 .is_some() 1404 ); 1405 assert!( 1406 db::transfer_by_id(&pool, 3, &CURR, &ROOT) 1407 .await 1408 .unwrap() 1409 .is_none() 1410 ); 1411 assert_eq!( 1412 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1413 .await 1414 .unwrap() 1415 .len(), 1416 2 1417 ); 1418 } 1419 1420 #[tokio::test] 1421 async fn bounce() { 1422 let (mut db, _) = setup().await; 1423 1424 let amount = decimal("10"); 1425 let now = now_sql_stable_ts(); 1426 1427 // Bounce 1428 assert_eq!( 1429 db::register_bounced_tx_in( 1430 &mut db, 1431 &TxIn { 1432 transfer_id: 12, 1433 tx_id: None, 1434 amount, 1435 subject: "subject".to_owned(), 1436 debtor_id: 31000163100000000, 1437 debtor_name: "Name".into(), 1438 valued_at: now 1439 }, 1440 22, 1441 "good reason", 1442 &now 1443 ) 1444 .await 1445 .expect("bounce"), 1446 BounceResult { 1447 tx_id: 1, 1448 tx_new: true 1449 } 1450 ); 1451 // Idempotent 1452 assert_eq!( 1453 db::register_bounced_tx_in( 1454 &mut db, 1455 &TxIn { 1456 transfer_id: 12, 1457 tx_id: None, 1458 amount, 1459 subject: "subject".to_owned(), 1460 debtor_id: 31000163100000000, 1461 debtor_name: "Name".into(), 1462 valued_at: now 1463 }, 1464 22, 1465 "good reason", 1466 &now 1467 ) 1468 .await 1469 .expect("bounce"), 1470 BounceResult { 1471 tx_id: 1, 1472 tx_new: false 1473 } 1474 ); 1475 1476 // Many 1477 assert_eq!( 1478 db::register_bounced_tx_in( 1479 &mut db, 1480 &TxIn { 1481 transfer_id: 13, 1482 tx_id: None, 1483 amount, 1484 subject: "subject".to_owned(), 1485 debtor_id: 31000163100000000, 1486 debtor_name: "Name".into(), 1487 valued_at: now 1488 }, 1489 23, 1490 "good reason", 1491 &now 1492 ) 1493 .await 1494 .expect("bounce"), 1495 BounceResult { 1496 tx_id: 2, 1497 tx_new: true 1498 } 1499 ); 1500 } 1501 1502 #[tokio::test] 1503 async fn status() { 1504 let (mut db, pool) = setup().await; 1505 1506 let check_status = async |id: u64, status: TransferState, msg: Option<&str>| { 1507 let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT) 1508 .await 1509 .unwrap() 1510 .unwrap(); 1511 assert_eq!( 1512 (status, msg), 1513 (transfer.status, transfer.status_msg.as_deref()) 1514 ); 1515 }; 1516 1517 // Unknown transfer 1518 db::initiated_submit_permanent_failure(&mut db, 1, "msg") 1519 .await 1520 .unwrap(); 1521 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1522 .await 1523 .unwrap(); 1524 assert_eq!( 1525 db::initiated_chargeback_failure(&mut db, 1).await.unwrap(), 1526 ChargebackFailureResult::Unknown 1527 ); 1528 1529 // Failure 1530 db::make_transfer( 1531 &pool, 1532 &Transfer { 1533 request_uid: HashCode::rand(), 1534 amount: decimal("1"), 1535 exchange_base_url: url("https://exchange.test.com/"), 1536 metadata: None, 1537 wtid: ShortHashCode::rand(), 1538 creditor_id: 31000163100000000, 1539 creditor_name: "Name".into(), 1540 }, 1541 &Timestamp::now(), 1542 ) 1543 .await 1544 .expect("transfer"); 1545 check_status(1, TransferState::pending, None).await; 1546 db::initiated_submit_permanent_failure(&mut db, 1, "error status") 1547 .await 1548 .unwrap(); 1549 check_status(1, TransferState::permanent_failure, Some("error status")).await; 1550 1551 // Success 1552 db::make_transfer( 1553 &pool, 1554 &Transfer { 1555 request_uid: HashCode::rand(), 1556 amount: decimal("1"), 1557 exchange_base_url: url("https://exchange.test.com/"), 1558 metadata: None, 1559 wtid: ShortHashCode::rand(), 1560 creditor_id: 31000163100000000, 1561 creditor_name: "Name".into(), 1562 }, 1563 &Timestamp::now(), 1564 ) 1565 .await 1566 .expect("transfer"); 1567 check_status(2, TransferState::pending, None).await; 1568 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1569 .await 1570 .unwrap(); 1571 check_status(2, TransferState::pending, None).await; 1572 db::register_tx_out( 1573 &mut db, 1574 &TxOut { 1575 transfer_id: 5, 1576 tx_id: Some(3), 1577 amount: decimal("2"), 1578 subject: "".to_string(), 1579 creditor_id: 31000163100000000, 1580 creditor_name: "Name".into(), 1581 valued_at: Timestamp::now(), 1582 }, 1583 &TxOutKind::Simple, 1584 &Timestamp::now(), 1585 ) 1586 .await 1587 .unwrap(); 1588 check_status(2, TransferState::success, None).await; 1589 1590 // Chargeback 1591 assert_eq!( 1592 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1593 ChargebackFailureResult::Known(2) 1594 ); 1595 check_status(2, TransferState::late_failure, Some("charged back")).await; 1596 assert_eq!( 1597 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1598 ChargebackFailureResult::Idempotent(2) 1599 ); 1600 } 1601 1602 #[tokio::test] 1603 async fn batch() { 1604 let (mut db, pool) = setup().await; 1605 let start = Timestamp::now(); 1606 1607 // Empty db 1608 let pendings = db::pending_batch(&mut db, &start) 1609 .await 1610 .expect("pending_batch"); 1611 assert_eq!(pendings.len(), 0); 1612 1613 // Some transfers 1614 for i in 0..3 { 1615 db::make_transfer( 1616 &pool, 1617 &Transfer { 1618 request_uid: HashCode::rand(), 1619 amount: decimal(format!("{}", i + 1)), 1620 exchange_base_url: url("https://exchange.test.com/"), 1621 metadata: None, 1622 wtid: ShortHashCode::rand(), 1623 creditor_id: 31000163100000000, 1624 creditor_name: "Name".into(), 1625 }, 1626 &Timestamp::now(), 1627 ) 1628 .await 1629 .expect("transfer"); 1630 } 1631 let pendings = db::pending_batch(&mut db, &start) 1632 .await 1633 .expect("pending_batch"); 1634 assert_eq!(pendings.len(), 3); 1635 1636 // Max 100 txs in batch 1637 for i in 0..100 { 1638 db::make_transfer( 1639 &pool, 1640 &Transfer { 1641 request_uid: HashCode::rand(), 1642 amount: decimal(format!("{}", i + 1)), 1643 exchange_base_url: url("https://exchange.test.com/"), 1644 metadata: None, 1645 wtid: ShortHashCode::rand(), 1646 creditor_id: 31000163100000000, 1647 creditor_name: "Name".into(), 1648 }, 1649 &Timestamp::now(), 1650 ) 1651 .await 1652 .expect("transfer"); 1653 } 1654 let pendings = db::pending_batch(&mut db, &start) 1655 .await 1656 .expect("pending_batch"); 1657 assert_eq!(pendings.len(), 100); 1658 1659 // Skip uploaded 1660 for i in 0..=10 { 1661 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1662 .await 1663 .expect("status success"); 1664 } 1665 let pendings = db::pending_batch(&mut db, &start) 1666 .await 1667 .expect("pending_batch"); 1668 assert_eq!(pendings.len(), 93); 1669 1670 // Skip failed 1671 for i in 0..=10 { 1672 db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") 1673 .await 1674 .expect("status failure"); 1675 } 1676 let pendings = db::pending_batch(&mut db, &start) 1677 .await 1678 .expect("pending_batch"); 1679 assert_eq!(pendings.len(), 83); 1680 } 1681 }