taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (50719B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::{Timestamp, civil::Date, tz::TimeZone};
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api_common::{HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_revenue::RevenueIncomingBankTransaction,
     32     api_transfer::{RegistrationRequest, Unregistration},
     33     api_wire::{
     34         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     35         TransferStatus,
     36     },
     37     config::Config,
     38     db::IncomingType,
     39     types::{
     40         amount::{Amount, Decimal},
     41         payto::PaytoImpl as _,
     42     },
     43 };
     44 use tokio::sync::watch::{Receiver, Sender};
     45 use url::Url;
     46 
     47 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus};
     48 
     49 const SCHEMA: &str = "magnet_bank";
     50 
     51 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     52     let db = parse_db_cfg(cfg)?;
     53     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     54     Ok(pool)
     55 }
     56 
     57 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     58     let db_cfg = parse_db_cfg(cfg)?;
     59     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     60     let mut db = pool.acquire().await?;
     61     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
     62     Ok(pool)
     63 }
     64 
     65 pub async fn notification_listener(
     66     pool: PgPool,
     67     in_channel: Sender<i64>,
     68     taler_in_channel: Sender<i64>,
     69     out_channel: Sender<i64>,
     70     taler_out_channel: Sender<i64>,
     71 ) -> sqlx::Result<()> {
     72     taler_api::notification::notification_listener!(&pool,
     73         "tx_in" => (row_id: i64) {
     74             in_channel.send_replace(row_id);
     75         },
     76         "taler_in" => (row_id: i64) {
     77             taler_in_channel.send_replace(row_id);
     78         },
     79         "tx_out" => (row_id: i64) {
     80             out_channel.send_replace(row_id);
     81         },
     82         "taler_out" => (row_id: i64) {
     83             taler_out_channel.send_replace(row_id);
     84         }
     85     )
     86 }
     87 
     88 #[derive(Debug, Clone)]
     89 pub struct TxIn {
     90     pub code: u64,
     91     pub amount: Amount,
     92     pub subject: Box<str>,
     93     pub debtor: FullHuPayto,
     94     pub value_date: Date,
     95     pub status: TxStatus,
     96 }
     97 
     98 impl Display for TxIn {
     99     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    100         let Self {
    101             code,
    102             amount,
    103             subject,
    104             debtor,
    105             value_date,
    106             status,
    107         } = self;
    108         write!(
    109             f,
    110             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    111             debtor.bban(),
    112             debtor.name
    113         )
    114     }
    115 }
    116 
    117 #[derive(Debug, Clone)]
    118 pub struct TxOut {
    119     pub code: u64,
    120     pub amount: Amount,
    121     pub subject: Box<str>,
    122     pub creditor: FullHuPayto,
    123     pub value_date: Date,
    124     pub status: TxStatus,
    125 }
    126 
    127 impl Display for TxOut {
    128     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    129         let Self {
    130             code,
    131             amount,
    132             subject,
    133             creditor,
    134             value_date,
    135             status,
    136         } = self;
    137         write!(
    138             f,
    139             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    140             creditor.bban(),
    141             &creditor.name
    142         )
    143     }
    144 }
    145 
    146 #[derive(Debug, PartialEq, Eq)]
    147 pub struct Initiated {
    148     pub id: u64,
    149     pub amount: Amount,
    150     pub subject: Box<str>,
    151     pub creditor: FullHuPayto,
    152 }
    153 
    154 impl Display for Initiated {
    155     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    156         let Self {
    157             id,
    158             amount,
    159             subject,
    160             creditor,
    161         } = self;
    162         write!(
    163             f,
    164             "{id} {amount} ({} {}) '{subject}'",
    165             creditor.bban(),
    166             &creditor.name
    167         )
    168     }
    169 }
    170 
    171 #[derive(Debug, Clone)]
    172 pub struct TxInAdmin {
    173     pub amount: Amount,
    174     pub subject: String,
    175     pub debtor: FullHuPayto,
    176     pub metadata: IncomingSubject,
    177 }
    178 
    179 /// Lock the database for worker execution
    180 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    181     sqlx::query("SELECT pg_try_advisory_lock(42)")
    182         .try_map(|r: PgRow| r.try_get(0))
    183         .fetch_one(e)
    184         .await
    185 }
    186 
    187 #[derive(Debug, PartialEq, Eq)]
    188 pub enum AddIncomingResult {
    189     Success {
    190         new: bool,
    191         pending: bool,
    192         row_id: u64,
    193         valued_at: Date,
    194     },
    195     ReservePubReuse,
    196     UnknownMapping,
    197     MappingReuse,
    198 }
    199 
    200 pub async fn register_tx_in_admin(
    201     db: &PgPool,
    202     tx: &TxInAdmin,
    203     now: &Timestamp,
    204 ) -> sqlx::Result<AddIncomingResult> {
    205     serialized!(
    206         sqlx::query(
    207             "
    208                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    209                 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    210             ",
    211         )
    212         .bind(&tx.amount)
    213         .bind(&tx.subject)
    214         .bind(tx.debtor.iban())
    215         .bind(&tx.debtor.name)
    216         .bind_date(&now.to_zoned(TimeZone::UTC).date())
    217         .bind(tx.metadata.ty())
    218         .bind(tx.metadata.key())
    219         .try_map(|r: PgRow| {
    220             Ok(if r.try_get_flag(0)? {
    221                 AddIncomingResult::ReservePubReuse
    222             } else if r.try_get_flag(1)? {
    223                 AddIncomingResult::MappingReuse
    224             } else if r.try_get_flag(2)? {
    225                 AddIncomingResult::UnknownMapping
    226             } else {
    227                 AddIncomingResult::Success {
    228                     row_id: r.try_get_u64(3)?,
    229                     valued_at: r.try_get_date(4)?,
    230                     new: r.try_get(5)?,
    231                     pending: r.try_get(6)?
    232                 }
    233             })
    234         })
    235         .fetch_one(db)
    236     )
    237 }
    238 
    239 pub async fn register_tx_in(
    240     db: &mut PgConnection,
    241     tx: &TxIn,
    242     subject: &Option<IncomingSubject>,
    243     now: &Timestamp,
    244 ) -> sqlx::Result<AddIncomingResult> {
    245     serialized!(
    246         sqlx::query(
    247             "
    248                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    249                 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9)
    250             ",
    251         )
    252         .bind(tx.code as i64)
    253         .bind(&tx.amount)
    254         .bind(&tx.subject)
    255         .bind(tx.debtor.iban())
    256         .bind(&tx.debtor.name)
    257         .bind_date(&tx.value_date)
    258         .bind(subject.as_ref().map(|it| it.ty()))
    259         .bind(subject.as_ref().map(|it| it.key()))
    260         .bind_timestamp(now)
    261         .try_map(|r: PgRow| {
    262             Ok(if r.try_get_flag(0)? {
    263                 AddIncomingResult::ReservePubReuse
    264             } else if r.try_get_flag(1)? {
    265                 AddIncomingResult::MappingReuse
    266             } else if r.try_get_flag(2)? {
    267                 AddIncomingResult::UnknownMapping
    268             } else {
    269                 AddIncomingResult::Success {
    270                     row_id: r.try_get_u64(3)?,
    271                     valued_at: r.try_get_date(4)?,
    272                     new: r.try_get(5)?,
    273                     pending: r.try_get(6)?
    274                 }
    275             })
    276         })
    277         .fetch_one(&mut *db)
    278     )
    279 }
    280 
    281 #[derive(Debug)]
    282 pub enum TxOutKind {
    283     Simple,
    284     Bounce(u32),
    285     Talerable(OutgoingSubject),
    286 }
    287 
    288 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    289 #[allow(non_camel_case_types)]
    290 #[sqlx(type_name = "register_result")]
    291 pub enum RegisterResult {
    292     /// Already registered
    293     idempotent,
    294     /// Initiated transaction
    295     known,
    296     /// Recovered unknown outgoing transaction
    297     recovered,
    298 }
    299 
    300 #[derive(Debug, PartialEq, Eq)]
    301 pub struct AddOutgoingResult {
    302     pub result: RegisterResult,
    303     pub row_id: u64,
    304 }
    305 
    306 pub async fn register_tx_out(
    307     db: &mut PgConnection,
    308     tx: &TxOut,
    309     kind: &TxOutKind,
    310     now: &Timestamp,
    311 ) -> sqlx::Result<AddOutgoingResult> {
    312     serialized!({
    313         let query = sqlx::query(
    314             "
    315                 SELECT out_result, out_tx_row_id 
    316                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
    317             ",
    318         )
    319         .bind(tx.code as i64)
    320         .bind(&tx.amount)
    321         .bind(&tx.subject)
    322         .bind(tx.creditor.iban())
    323         .bind(&tx.creditor.name)
    324         .bind_date(&tx.value_date);
    325         let query = match kind {
    326             TxOutKind::Simple => query
    327                 .bind(None::<&[u8]>)
    328                 .bind(None::<&str>)
    329                 .bind(None::<&str>)
    330                 .bind(None::<i64>),
    331             TxOutKind::Bounce(bounced) => query
    332                 .bind(None::<&[u8]>)
    333                 .bind(None::<&str>)
    334                 .bind(None::<&str>)
    335                 .bind(*bounced as i64),
    336             TxOutKind::Talerable(subject) => query
    337                 .bind(&subject.wtid)
    338                 .bind(subject.exchange_base_url.as_str())
    339                 .bind(&subject.metadata)
    340                 .bind(None::<i64>),
    341         };
    342         query
    343             .bind_timestamp(now)
    344             .try_map(|r: PgRow| {
    345                 Ok(AddOutgoingResult {
    346                     result: r.try_get(0)?,
    347                     row_id: r.try_get_u64(1)?,
    348                 })
    349             })
    350             .fetch_one(&mut *db)
    351     })
    352 }
    353 
    354 #[derive(Debug, PartialEq, Eq)]
    355 pub struct OutFailureResult {
    356     pub initiated_id: Option<u64>,
    357     pub new: bool,
    358 }
    359 
    360 pub async fn register_tx_out_failure(
    361     db: &mut PgConnection,
    362     code: u64,
    363     bounced: Option<u32>,
    364     now: &Timestamp,
    365 ) -> sqlx::Result<OutFailureResult> {
    366     serialized!(
    367         sqlx::query(
    368             "
    369                 SELECT out_new, out_initiated_id 
    370                 FROM register_tx_out_failure($1, $2, $3)
    371             ",
    372         )
    373         .bind(code as i64)
    374         .bind(bounced.map(|i| i as i32))
    375         .bind_timestamp(now)
    376         .try_map(|r: PgRow| {
    377             Ok(OutFailureResult {
    378                 new: r.try_get(0)?,
    379                 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
    380             })
    381         })
    382         .fetch_one(&mut *db)
    383     )
    384 }
    385 
    386 #[derive(Debug, PartialEq, Eq)]
    387 pub enum TransferResult {
    388     Success { id: u64, initiated_at: Timestamp },
    389     RequestUidReuse,
    390     WtidReuse,
    391 }
    392 
    393 #[derive(Debug, Clone)]
    394 pub struct Transfer {
    395     pub request_uid: HashCode,
    396     pub amount: Decimal,
    397     pub exchange_base_url: Url,
    398     pub metadata: Option<CompactString>,
    399     pub wtid: ShortHashCode,
    400     pub creditor: FullHuPayto,
    401 }
    402 
    403 pub async fn make_transfer(
    404     db: &PgPool,
    405     tx: &Transfer,
    406     now: &Timestamp,
    407 ) -> sqlx::Result<TransferResult> {
    408     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    409     serialized!(
    410         sqlx::query(
    411             "
    412                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    413                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    414             ",
    415         )
    416         .bind(&tx.request_uid)
    417         .bind(&tx.wtid)
    418         .bind(&subject)
    419         .bind(tx.amount)
    420         .bind(tx.exchange_base_url.as_str())
    421         .bind(&tx.metadata)
    422         .bind(tx.creditor.iban())
    423         .bind(&tx.creditor.name)
    424         .bind_timestamp(now)
    425         .try_map(|r: PgRow| {
    426             Ok(if r.try_get_flag(0)? {
    427                 TransferResult::RequestUidReuse
    428             } else if r.try_get_flag(1)? {
    429                 TransferResult::WtidReuse
    430             } else {
    431                 TransferResult::Success {
    432                     id: r.try_get_u64(2)?,
    433                     initiated_at: r.try_get_timestamp(3)?,
    434                 }
    435             })
    436         })
    437         .fetch_one(db)
    438     )
    439 }
    440 
    441 #[derive(Debug, PartialEq, Eq)]
    442 pub struct BounceResult {
    443     pub tx_id: u64,
    444     pub tx_new: bool,
    445     pub bounce_id: u64,
    446     pub bounce_new: bool,
    447 }
    448 
    449 pub async fn register_bounce_tx_in(
    450     db: &mut PgConnection,
    451     tx: &TxIn,
    452     reason: &str,
    453     now: &Timestamp,
    454 ) -> sqlx::Result<BounceResult> {
    455     serialized!(
    456         sqlx::query(
    457             "
    458                 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new
    459                 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8)
    460             ",
    461         )
    462         .bind(tx.code as i64)
    463         .bind(&tx.amount)
    464         .bind(&tx.subject)
    465         .bind(tx.debtor.iban())
    466         .bind(&tx.debtor.name)
    467         .bind_date(&tx.value_date)
    468         .bind(reason)
    469         .bind_timestamp(now)
    470         .try_map(|r: PgRow| {
    471             Ok(BounceResult {
    472                 tx_id: r.try_get_u64(0)?,
    473                 tx_new: r.try_get(1)?,
    474                 bounce_id: r.try_get_u64(2)?,
    475                 bounce_new: r.try_get(3)?,
    476             })
    477         })
    478         .fetch_one(&mut *db)
    479     )
    480 }
    481 
    482 pub async fn transfer_page(
    483     db: &PgPool,
    484     status: &Option<TransferState>,
    485     params: &Page,
    486 ) -> sqlx::Result<Vec<TransferListStatus>> {
    487     page(
    488         db,
    489         "initiated_id",
    490         params,
    491         || {
    492             let mut builder = QueryBuilder::new(
    493                 "
    494                     SELECT
    495                         initiated_id,
    496                         status,
    497                         amount,
    498                         credit_account,
    499                         credit_name,
    500                         initiated_at
    501                     FROM transfer 
    502                     JOIN initiated USING (initiated_id)
    503                     WHERE
    504                 ",
    505             );
    506             if let Some(status) = status {
    507                 builder.push(" status = ").push_bind(status).push(" AND ");
    508             }
    509             builder
    510         },
    511         |r: PgRow| {
    512             Ok(TransferListStatus {
    513                 row_id: r.try_get_safeu64(0)?,
    514                 status: r.try_get(1)?,
    515                 amount: r.try_get_amount(2, &CURRENCY)?,
    516                 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    517                 timestamp: r.try_get_timestamp(5)?.into(),
    518             })
    519         },
    520     )
    521     .await
    522 }
    523 
    524 pub async fn outgoing_history(
    525     db: &PgPool,
    526     params: &History,
    527     listen: impl FnOnce() -> Receiver<i64>,
    528 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    529     history(
    530         db,
    531         "tx_out_id",
    532         params,
    533         listen,
    534         || {
    535             QueryBuilder::new(
    536                 "
    537                 SELECT
    538                     tx_out_id,
    539                     amount,
    540                     credit_account,
    541                     credit_name,
    542                     valued_at,
    543                     exchange_base_url,
    544                     metadata,
    545                     wtid
    546                 FROM taler_out
    547                 JOIN tx_out USING (tx_out_id)
    548                 WHERE
    549             ",
    550             )
    551         },
    552         |r: PgRow| {
    553             Ok(OutgoingBankTransaction {
    554                 row_id: r.try_get_safeu64(0)?,
    555                 amount: r.try_get_amount(1, &CURRENCY)?,
    556                 debit_fee: None,
    557                 credit_account: r.try_get_iban(2)?.as_full_payto(r.try_get(3)?),
    558                 date: r.try_get_timestamp(4)?.into(),
    559                 exchange_base_url: r.try_get_url(5)?,
    560                 metadata: r.try_get(6)?,
    561                 wtid: r.try_get(7)?,
    562             })
    563         },
    564     )
    565     .await
    566 }
    567 
    568 pub async fn incoming_history(
    569     db: &PgPool,
    570     params: &History,
    571     listen: impl FnOnce() -> Receiver<i64>,
    572 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    573     history(
    574         db,
    575         "tx_in_id",
    576         params,
    577         listen,
    578         || {
    579             QueryBuilder::new(
    580                 "
    581                 SELECT
    582                     type,
    583                     tx_in_id,
    584                     amount,
    585                     debit_account,
    586                     debit_name,
    587                     valued_at,
    588                     metadata,
    589                     authorization_pub,
    590                     authorization_sig
    591                 FROM taler_in
    592                 JOIN tx_in USING (tx_in_id)
    593                 WHERE
    594             ",
    595             )
    596         },
    597         |r: PgRow| {
    598             Ok(match r.try_get(0)? {
    599                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    600                     row_id: r.try_get_safeu64(1)?,
    601                     amount: r.try_get_amount(2, &CURRENCY)?,
    602                     credit_fee: None,
    603                     debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    604                     date: r.try_get_timestamp(5)?.into(),
    605                     reserve_pub: r.try_get(6)?,
    606                     authorization_pub: r.try_get(7)?,
    607                     authorization_sig: r.try_get(8)?,
    608                 },
    609                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    610                     row_id: r.try_get_safeu64(1)?,
    611                     amount: r.try_get_amount(2, &CURRENCY)?,
    612                     credit_fee: None,
    613                     debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    614                     date: r.try_get_timestamp(5)?.into(),
    615                     account_pub: r.try_get(6)?,
    616                     authorization_pub: r.try_get(7)?,
    617                     authorization_sig: r.try_get(8)?,
    618                 },
    619                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    620             })
    621         },
    622     )
    623     .await
    624 }
    625 
    626 pub async fn revenue_history(
    627     db: &PgPool,
    628     params: &History,
    629     listen: impl FnOnce() -> Receiver<i64>,
    630 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    631     history(
    632         db,
    633         "tx_in_id",
    634         params,
    635         listen,
    636         || {
    637             QueryBuilder::new(
    638                 "
    639                 SELECT
    640                     tx_in_id,
    641                     valued_at,
    642                     amount,
    643                     debit_account,
    644                     debit_name,
    645                     subject
    646                 FROM tx_in
    647                 WHERE
    648             ",
    649             )
    650         },
    651         |r: PgRow| {
    652             Ok(RevenueIncomingBankTransaction {
    653                 row_id: r.try_get_safeu64(0)?,
    654                 date: r.try_get_timestamp(1)?.into(),
    655                 amount: r.try_get_amount(2, &CURRENCY)?,
    656                 credit_fee: None,
    657                 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    658                 subject: r.try_get(5)?,
    659             })
    660         },
    661     )
    662     .await
    663 }
    664 
    665 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> {
    666     serialized!(
    667         sqlx::query(
    668             "
    669                 SELECT
    670                     status,
    671                     status_msg,
    672                     amount,
    673                     exchange_base_url,
    674                     metadata,
    675                     wtid,
    676                     credit_account,
    677                     credit_name,
    678                     initiated_at
    679                 FROM transfer 
    680                 JOIN initiated USING (initiated_id) 
    681                 WHERE initiated_id = $1
    682             ",
    683         )
    684         .bind(id as i64)
    685         .try_map(|r: PgRow| {
    686             Ok(TransferStatus {
    687                 status: r.try_get(0)?,
    688                 status_msg: r.try_get(1)?,
    689                 amount: r.try_get_amount(2, &CURRENCY)?,
    690                 origin_exchange_url: r.try_get(3)?,
    691                 metadata: r.try_get(4)?,
    692                 wtid: r.try_get(5)?,
    693                 credit_account: r.try_get_iban(6)?.as_full_payto(r.try_get(7)?),
    694                 timestamp: r.try_get_timestamp(8)?.into(),
    695             })
    696         })
    697         .fetch_optional(db)
    698     )
    699 }
    700 
    701 /** Get a batch of pending initiated transactions not attempted since [start] */
    702 pub async fn pending_batch(
    703     db: &mut PgConnection,
    704     start: &Timestamp,
    705 ) -> sqlx::Result<Vec<Initiated>> {
    706     serialized!(
    707         sqlx::query(
    708             "
    709                 SELECT initiated_id, amount, subject, credit_account, credit_name
    710                 FROM initiated 
    711                 WHERE magnet_code IS NULL
    712                     AND status='pending'
    713                     AND (last_submitted IS NULL OR last_submitted < $1)
    714                 LIMIT 100
    715             ",
    716         )
    717         .bind_timestamp(start)
    718         .try_map(|r: PgRow| {
    719             Ok(Initiated {
    720                 id: r.try_get_u64(0)?,
    721                 amount: r.try_get_amount(1, &CURRENCY)?,
    722                 subject: r.try_get(2)?,
    723                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    724             })
    725         })
    726         .fetch_all(&mut *db)
    727     )
    728 }
    729 
    730 /** Get an initiated transaction matching the given magnet [code] */
    731 pub async fn initiated_by_code(
    732     db: &mut PgConnection,
    733     code: u64,
    734 ) -> sqlx::Result<Option<Initiated>> {
    735     serialized!(
    736         sqlx::query(
    737             "
    738                 SELECT initiated_id, amount, subject, credit_account, credit_name
    739                 FROM initiated 
    740                 WHERE magnet_code IS $1
    741             ",
    742         )
    743         .bind(code as i64)
    744         .try_map(|r: PgRow| {
    745             Ok(Initiated {
    746                 id: r.try_get_u64(0)?,
    747                 amount: r.try_get_amount(1, &CURRENCY)?,
    748                 subject: r.try_get(2)?,
    749                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    750             })
    751         })
    752         .fetch_optional(&mut *db)
    753     )
    754 }
    755 
    756 /** Update status of a successful submitted initiated transaction */
    757 pub async fn initiated_submit_success(
    758     db: &mut PgConnection,
    759     id: u64,
    760     timestamp: &Timestamp,
    761     magnet_code: u64,
    762 ) -> sqlx::Result<()> {
    763     serialized!(
    764         sqlx::query(
    765             "
    766                 UPDATE initiated
    767                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
    768                 WHERE initiated_id=$3
    769             "
    770         ).bind_timestamp(timestamp)
    771         .bind(magnet_code as i64)
    772         .bind(id as i64)
    773         .execute(&mut *db)
    774     )?;
    775     Ok(())
    776 }
    777 
    778 /** Update status of a permanently failed initiated transaction */
    779 pub async fn initiated_submit_permanent_failure(
    780     db: &mut PgConnection,
    781     id: u64,
    782     timestamp: &Timestamp,
    783     msg: &str,
    784 ) -> sqlx::Result<()> {
    785     serialized!(
    786         sqlx::query(
    787             "
    788                 UPDATE initiated
    789                 SET status='permanent_failure', status_msg=$2
    790                 WHERE initiated_id=$3
    791             ",
    792         )
    793         .bind_timestamp(timestamp)
    794         .bind(msg)
    795         .bind(id as i64)
    796         .execute(&mut *db)
    797     )?;
    798     Ok(())
    799 }
    800 
    801 /** Check if an initiated transaction exist for a magnet code */
    802 pub async fn initiated_exists_for_code(
    803     db: &mut PgConnection,
    804     code: u64,
    805 ) -> sqlx::Result<Option<u64>> {
    806     serialized!(
    807         sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1")
    808             .bind(code as i64)
    809             .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64))
    810             .fetch_optional(&mut *db)
    811     )
    812 }
    813 
    814 /** Get JSON value from KV table */
    815 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    816     db: &mut PgConnection,
    817     key: &str,
    818 ) -> sqlx::Result<Option<T>> {
    819     serialized!(
    820         sqlx::query("SELECT value FROM kv WHERE key=$1")
    821             .bind(key)
    822             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    823             .fetch_optional(&mut *db)
    824     )
    825 }
    826 
    827 /** Set JSON value in KV table */
    828 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    829     serialized!(
    830         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    831             .bind(key)
    832             .bind(sqlx::types::Json(value))
    833             .execute(&mut *db)
    834     )?;
    835     Ok(())
    836 }
    837 
    838 pub enum RegistrationResult {
    839     Success,
    840     ReservePubReuse,
    841 }
    842 
    843 pub async fn transfer_register(
    844     db: &PgPool,
    845     req: &RegistrationRequest,
    846 ) -> sqlx::Result<RegistrationResult> {
    847     let ty: IncomingType = req.r#type.into();
    848     serialized!(
    849         sqlx::query(
    850             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    851         )
    852         .bind(ty)
    853         .bind(&req.account_pub)
    854         .bind(&req.authorization_pub)
    855         .bind(&req.authorization_sig)
    856         .bind(req.recurrent)
    857         .bind_timestamp(&Timestamp::now())
    858         .try_map(|r: PgRow| {
    859             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    860                 RegistrationResult::ReservePubReuse
    861             } else {
    862                 RegistrationResult::Success
    863             })
    864         })
    865         .fetch_one(db)
    866     )
    867 }
    868 
    869 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    870     serialized!(
    871         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    872             .bind(&req.authorization_pub)
    873             .bind_timestamp(&Timestamp::now())
    874             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    875             .fetch_one(db)
    876     )
    877 }
    878 
    879 #[cfg(test)]
    880 mod test {
    881     use jiff::{Span, Timestamp, Zoned};
    882     use serde_json::json;
    883     use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow};
    884     use taler_api::{
    885         db::TypeHelper,
    886         notification::dummy_listen,
    887         subject::{IncomingSubject, OutgoingSubject},
    888     };
    889     use taler_common::{
    890         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    891         api_params::{History, Page},
    892         types::{
    893             amount::{amount, decimal},
    894             url,
    895             utils::now_sql_stable_ts,
    896         },
    897     };
    898 
    899     use crate::{
    900         constants::CONFIG_SOURCE,
    901         db::{
    902             self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult,
    903             TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer,
    904             register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out,
    905         },
    906         magnet_api::types::TxStatus,
    907         magnet_payto,
    908     };
    909 
    910     use super::TxInAdmin;
    911 
    912     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    913         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    914     }
    915 
    916     #[tokio::test]
    917     async fn kv() {
    918         let (mut db, _) = setup().await;
    919 
    920         let value = json!({
    921             "name": "Mr Smith",
    922             "no way": 32
    923         });
    924 
    925         assert_eq!(
    926             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    927             None
    928         );
    929         kv_set(&mut db, "value", &value).await.unwrap();
    930         kv_set(&mut db, "value", &value).await.unwrap();
    931         assert_eq!(
    932             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    933             Some(value)
    934         );
    935     }
    936 
    937     #[tokio::test]
    938     async fn tx_in() {
    939         let (mut db, pool) = setup().await;
    940 
    941         let mut routine = async |first: &Option<IncomingSubject>,
    942                                  second: &Option<IncomingSubject>| {
    943             let (id, code) =
    944                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
    945                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
    946                     .fetch_one(&mut *db)
    947                     .await
    948                     .unwrap();
    949             let now = now_sql_stable_ts();
    950             let date = Zoned::now().date();
    951             let later = date.tomorrow().unwrap();
    952             let tx = TxIn {
    953                 code,
    954                 amount: amount("EUR:10"),
    955                 subject: "subject".into(),
    956                 debtor: magnet_payto(
    957                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
    958                 ),
    959                 value_date: date,
    960                 status: TxStatus::Completed,
    961             };
    962             // Insert
    963             assert_eq!(
    964                 register_tx_in(&mut db, &tx, first, &now)
    965                     .await
    966                     .expect("register tx in"),
    967                 AddIncomingResult::Success {
    968                     new: true,
    969                     pending: false,
    970                     row_id: id,
    971                     valued_at: date
    972                 }
    973             );
    974             // Idempotent
    975             assert_eq!(
    976                 register_tx_in(
    977                     &mut db,
    978                     &TxIn {
    979                         value_date: later,
    980                         ..tx.clone()
    981                     },
    982                     first,
    983                     &now
    984                 )
    985                 .await
    986                 .expect("register tx in"),
    987                 AddIncomingResult::Success {
    988                     new: false,
    989                     pending: false,
    990                     row_id: id,
    991                     valued_at: date
    992                 }
    993             );
    994             // Many
    995             assert_eq!(
    996                 register_tx_in(
    997                     &mut db,
    998                     &TxIn {
    999                         code: code + 1,
   1000                         value_date: later,
   1001                         ..tx
   1002                     },
   1003                     second,
   1004                     &now
   1005                 )
   1006                 .await
   1007                 .expect("register tx in"),
   1008                 AddIncomingResult::Success {
   1009                     new: true,
   1010                     pending: false,
   1011                     row_id: id + 1,
   1012                     valued_at: later
   1013                 }
   1014             );
   1015         };
   1016 
   1017         // Empty db
   1018         assert_eq!(
   1019             db::revenue_history(&pool, &History::default(), dummy_listen)
   1020                 .await
   1021                 .unwrap(),
   1022             Vec::new()
   1023         );
   1024         assert_eq!(
   1025             db::incoming_history(&pool, &History::default(), dummy_listen)
   1026                 .await
   1027                 .unwrap(),
   1028             Vec::new()
   1029         );
   1030 
   1031         // Regular transaction
   1032         routine(&None, &None).await;
   1033 
   1034         // Reserve transaction
   1035         routine(
   1036             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1037             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1038         )
   1039         .await;
   1040 
   1041         // Kyc transaction
   1042         routine(
   1043             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1044             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1045         )
   1046         .await;
   1047 
   1048         // History
   1049         assert_eq!(
   1050             db::revenue_history(&pool, &History::default(), dummy_listen)
   1051                 .await
   1052                 .unwrap()
   1053                 .len(),
   1054             6
   1055         );
   1056         assert_eq!(
   1057             db::incoming_history(&pool, &History::default(), dummy_listen)
   1058                 .await
   1059                 .unwrap()
   1060                 .len(),
   1061             4
   1062         );
   1063     }
   1064 
   1065     #[tokio::test]
   1066     async fn tx_in_admin() {
   1067         let (_, pool) = setup().await;
   1068 
   1069         // Empty db
   1070         assert_eq!(
   1071             db::incoming_history(&pool, &History::default(), dummy_listen)
   1072                 .await
   1073                 .unwrap(),
   1074             Vec::new()
   1075         );
   1076 
   1077         let now = now_sql_stable_ts();
   1078         let later = now + Span::new().hours(2);
   1079         let date = Zoned::now().date();
   1080         let tx = TxInAdmin {
   1081             amount: amount("EUR:10"),
   1082             subject: "subject".to_owned(),
   1083             debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1084             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1085         };
   1086         // Insert
   1087         assert_eq!(
   1088             register_tx_in_admin(&pool, &tx, &now)
   1089                 .await
   1090                 .expect("register tx in"),
   1091             AddIncomingResult::Success {
   1092                 new: true,
   1093                 pending: false,
   1094                 row_id: 1,
   1095                 valued_at: date
   1096             }
   1097         );
   1098         // Idempotent
   1099         assert_eq!(
   1100             register_tx_in_admin(&pool, &tx, &later)
   1101                 .await
   1102                 .expect("register tx in"),
   1103             AddIncomingResult::Success {
   1104                 new: false,
   1105                 pending: false,
   1106                 row_id: 1,
   1107                 valued_at: date
   1108             }
   1109         );
   1110         // Many
   1111         assert_eq!(
   1112             register_tx_in_admin(
   1113                 &pool,
   1114                 &TxInAdmin {
   1115                     subject: "Other".to_owned(),
   1116                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1117                     ..tx.clone()
   1118                 },
   1119                 &later
   1120             )
   1121             .await
   1122             .expect("register tx in"),
   1123             AddIncomingResult::Success {
   1124                 new: true,
   1125                 pending: false,
   1126                 row_id: 2,
   1127                 valued_at: date
   1128             }
   1129         );
   1130 
   1131         // History
   1132         assert_eq!(
   1133             db::incoming_history(&pool, &History::default(), dummy_listen)
   1134                 .await
   1135                 .unwrap()
   1136                 .len(),
   1137             2
   1138         );
   1139     }
   1140 
   1141     #[tokio::test]
   1142     async fn tx_out() {
   1143         let (mut db, pool) = setup().await;
   1144 
   1145         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1146             let (id, code) =
   1147                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
   1148                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
   1149                     .fetch_one(&mut *db)
   1150                     .await
   1151                     .unwrap();
   1152             let now = now_sql_stable_ts();
   1153             let date = Zoned::now().date();
   1154             let later = date.tomorrow().unwrap();
   1155             let tx = TxOut {
   1156                 code,
   1157                 amount: amount("HUF:10"),
   1158                 subject: "subject".into(),
   1159                 creditor: magnet_payto(
   1160                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
   1161                 ),
   1162                 value_date: date,
   1163                 status: TxStatus::Completed,
   1164             };
   1165             assert!(matches!(
   1166                 make_transfer(
   1167                     &pool,
   1168                     &db::Transfer {
   1169                         request_uid: HashCode::rand(),
   1170                         amount: decimal("10"),
   1171                         exchange_base_url: url("https://exchange.test.com/"),
   1172                         metadata: None,
   1173                         wtid: ShortHashCode::rand(),
   1174                         creditor: tx.creditor.clone()
   1175                     },
   1176                     &now
   1177                 )
   1178                 .await
   1179                 .unwrap(),
   1180                 TransferResult::Success { .. }
   1181             ));
   1182             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code)
   1183                 .await
   1184                 .expect("status success");
   1185 
   1186             // Insert
   1187             assert_eq!(
   1188                 register_tx_out(&mut db, &tx, first, &now)
   1189                     .await
   1190                     .expect("register tx out"),
   1191                 AddOutgoingResult {
   1192                     result: db::RegisterResult::known,
   1193                     row_id: id,
   1194                 }
   1195             );
   1196             // Idempotent
   1197             assert_eq!(
   1198                 register_tx_out(
   1199                     &mut db,
   1200                     &TxOut {
   1201                         value_date: later,
   1202                         ..tx.clone()
   1203                     },
   1204                     first,
   1205                     &now
   1206                 )
   1207                 .await
   1208                 .expect("register tx out"),
   1209                 AddOutgoingResult {
   1210                     result: db::RegisterResult::idempotent,
   1211                     row_id: id,
   1212                 }
   1213             );
   1214             // Recovered
   1215             assert_eq!(
   1216                 register_tx_out(
   1217                     &mut db,
   1218                     &TxOut {
   1219                         code: code + 1,
   1220                         value_date: later,
   1221                         ..tx.clone()
   1222                     },
   1223                     second,
   1224                     &now
   1225                 )
   1226                 .await
   1227                 .expect("register tx out"),
   1228                 AddOutgoingResult {
   1229                     result: db::RegisterResult::recovered,
   1230                     row_id: id + 1,
   1231                 }
   1232             );
   1233         };
   1234 
   1235         // Empty db
   1236         assert_eq!(
   1237             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1238                 .await
   1239                 .unwrap(),
   1240             Vec::new()
   1241         );
   1242 
   1243         // Regular transaction
   1244         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1245 
   1246         // Talerable transaction
   1247         routine(
   1248             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1249             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1250         )
   1251         .await;
   1252 
   1253         // Bounced transaction
   1254         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1255 
   1256         // History
   1257         assert_eq!(
   1258             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1259                 .await
   1260                 .unwrap()
   1261                 .len(),
   1262             2
   1263         );
   1264     }
   1265 
   1266     #[tokio::test]
   1267     async fn tx_out_failure() {
   1268         let (mut db, pool) = setup().await;
   1269 
   1270         let now = now_sql_stable_ts();
   1271 
   1272         // Unknown
   1273         assert_eq!(
   1274             db::register_tx_out_failure(&mut db, 42, None, &now)
   1275                 .await
   1276                 .unwrap(),
   1277             OutFailureResult {
   1278                 initiated_id: None,
   1279                 new: false
   1280             }
   1281         );
   1282         assert_eq!(
   1283             db::register_tx_out_failure(&mut db, 42, Some(12), &now)
   1284                 .await
   1285                 .unwrap(),
   1286             OutFailureResult {
   1287                 initiated_id: None,
   1288                 new: false
   1289             }
   1290         );
   1291 
   1292         // Initiated
   1293         let req = db::Transfer {
   1294             request_uid: HashCode::rand(),
   1295             amount: decimal("10"),
   1296             exchange_base_url: url("https://exchange.test.com/"),
   1297             metadata: None,
   1298             wtid: ShortHashCode::rand(),
   1299             creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1300         };
   1301         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1302         assert_eq!(
   1303             make_transfer(&pool, &req, &now).await.unwrap(),
   1304             TransferResult::Success {
   1305                 id: 1,
   1306                 initiated_at: now
   1307             }
   1308         );
   1309         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34)
   1310             .await
   1311             .expect("status success");
   1312         assert_eq!(
   1313             db::register_tx_out_failure(&mut db, 34, None, &now)
   1314                 .await
   1315                 .unwrap(),
   1316             OutFailureResult {
   1317                 initiated_id: Some(1),
   1318                 new: true
   1319             }
   1320         );
   1321         assert_eq!(
   1322             db::register_tx_out_failure(&mut db, 34, None, &now)
   1323                 .await
   1324                 .unwrap(),
   1325             OutFailureResult {
   1326                 initiated_id: Some(1),
   1327                 new: false
   1328             }
   1329         );
   1330 
   1331         // Recovered bounce
   1332         let tx = TxIn {
   1333             code: 12,
   1334             amount: amount("HUF:11"),
   1335             subject: "malformed transaction".into(),
   1336             debtor: payto,
   1337             value_date: Zoned::now().date(),
   1338             status: TxStatus::Completed,
   1339         };
   1340         assert_eq!(
   1341             db::register_bounce_tx_in(&mut db, &tx, "no reason", &now)
   1342                 .await
   1343                 .unwrap(),
   1344             BounceResult {
   1345                 tx_id: 1,
   1346                 tx_new: true,
   1347                 bounce_id: 2,
   1348                 bounce_new: true
   1349             }
   1350         );
   1351         assert_eq!(
   1352             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1353                 .await
   1354                 .unwrap(),
   1355             OutFailureResult {
   1356                 initiated_id: Some(2),
   1357                 new: true
   1358             }
   1359         );
   1360         assert_eq!(
   1361             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1362                 .await
   1363                 .unwrap(),
   1364             OutFailureResult {
   1365                 initiated_id: Some(2),
   1366                 new: false
   1367             }
   1368         );
   1369     }
   1370 
   1371     #[tokio::test]
   1372     async fn transfer() {
   1373         let (_, pool) = setup().await;
   1374 
   1375         // Empty db
   1376         assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None);
   1377         assert_eq!(
   1378             db::transfer_page(&pool, &None, &Page::default())
   1379                 .await
   1380                 .unwrap(),
   1381             Vec::new()
   1382         );
   1383 
   1384         let req = db::Transfer {
   1385             request_uid: HashCode::rand(),
   1386             amount: decimal("10"),
   1387             exchange_base_url: url("https://exchange.test.com/"),
   1388             metadata: None,
   1389             wtid: ShortHashCode::rand(),
   1390             creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1391         };
   1392         let now = now_sql_stable_ts();
   1393         let later = now + Span::new().hours(2);
   1394         // Insert
   1395         assert_eq!(
   1396             make_transfer(&pool, &req, &now).await.expect("transfer"),
   1397             TransferResult::Success {
   1398                 id: 1,
   1399                 initiated_at: now
   1400             }
   1401         );
   1402         // Idempotent
   1403         assert_eq!(
   1404             make_transfer(&pool, &req, &later).await.expect("transfer"),
   1405             TransferResult::Success {
   1406                 id: 1,
   1407                 initiated_at: now
   1408             }
   1409         );
   1410         // Request UID reuse
   1411         assert_eq!(
   1412             make_transfer(
   1413                 &pool,
   1414                 &db::Transfer {
   1415                     wtid: ShortHashCode::rand(),
   1416                     ..req.clone()
   1417                 },
   1418                 &now
   1419             )
   1420             .await
   1421             .expect("transfer"),
   1422             TransferResult::RequestUidReuse
   1423         );
   1424         // wtid reuse
   1425         assert_eq!(
   1426             make_transfer(
   1427                 &pool,
   1428                 &db::Transfer {
   1429                     request_uid: HashCode::rand(),
   1430                     ..req.clone()
   1431                 },
   1432                 &now
   1433             )
   1434             .await
   1435             .expect("transfer"),
   1436             TransferResult::WtidReuse
   1437         );
   1438         // Many
   1439         assert_eq!(
   1440             make_transfer(
   1441                 &pool,
   1442                 &db::Transfer {
   1443                     request_uid: HashCode::rand(),
   1444                     wtid: ShortHashCode::rand(),
   1445                     ..req
   1446                 },
   1447                 &later
   1448             )
   1449             .await
   1450             .expect("transfer"),
   1451             TransferResult::Success {
   1452                 id: 2,
   1453                 initiated_at: later
   1454             }
   1455         );
   1456 
   1457         // Get
   1458         assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some());
   1459         assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some());
   1460         assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none());
   1461         assert_eq!(
   1462             db::transfer_page(&pool, &None, &Page::default())
   1463                 .await
   1464                 .unwrap()
   1465                 .len(),
   1466             2
   1467         );
   1468     }
   1469 
   1470     #[tokio::test]
   1471     async fn bounce() {
   1472         let (mut db, _) = setup().await;
   1473 
   1474         let amount = amount("HUF:10");
   1475         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1476         let now = now_sql_stable_ts();
   1477         let date = Zoned::now().date();
   1478 
   1479         // Empty db
   1480         assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty());
   1481 
   1482         // Insert
   1483         assert_eq!(
   1484             register_tx_in(
   1485                 &mut db,
   1486                 &TxIn {
   1487                     code: 13,
   1488                     amount: amount.clone(),
   1489                     subject: "subject".into(),
   1490                     debtor: payto.clone(),
   1491                     value_date: date,
   1492                     status: TxStatus::Completed
   1493                 },
   1494                 &None,
   1495                 &now
   1496             )
   1497             .await
   1498             .expect("register tx in"),
   1499             AddIncomingResult::Success {
   1500                 new: true,
   1501                 pending: false,
   1502                 row_id: 1,
   1503                 valued_at: date
   1504             }
   1505         );
   1506 
   1507         // Bounce
   1508         assert_eq!(
   1509             register_bounce_tx_in(
   1510                 &mut db,
   1511                 &TxIn {
   1512                     code: 12,
   1513                     amount: amount.clone(),
   1514                     subject: "subject".into(),
   1515                     debtor: payto.clone(),
   1516                     value_date: date,
   1517                     status: TxStatus::Completed
   1518                 },
   1519                 "good reason",
   1520                 &now
   1521             )
   1522             .await
   1523             .expect("bounce"),
   1524             BounceResult {
   1525                 tx_id: 2,
   1526                 tx_new: true,
   1527                 bounce_id: 1,
   1528                 bounce_new: true
   1529             }
   1530         );
   1531         // Idempotent
   1532         assert_eq!(
   1533             register_bounce_tx_in(
   1534                 &mut db,
   1535                 &TxIn {
   1536                     code: 12,
   1537                     amount: amount.clone(),
   1538                     subject: "subject".into(),
   1539                     debtor: payto.clone(),
   1540                     value_date: date,
   1541                     status: TxStatus::Completed
   1542                 },
   1543                 "good reason",
   1544                 &now
   1545             )
   1546             .await
   1547             .expect("bounce"),
   1548             BounceResult {
   1549                 tx_id: 2,
   1550                 tx_new: false,
   1551                 bounce_id: 1,
   1552                 bounce_new: false
   1553             }
   1554         );
   1555 
   1556         // Bounce registered
   1557         assert_eq!(
   1558             register_bounce_tx_in(
   1559                 &mut db,
   1560                 &TxIn {
   1561                     code: 13,
   1562                     amount: amount.clone(),
   1563                     subject: "subject".into(),
   1564                     debtor: payto.clone(),
   1565                     value_date: date,
   1566                     status: TxStatus::Completed
   1567                 },
   1568                 "good reason",
   1569                 &now
   1570             )
   1571             .await
   1572             .expect("bounce"),
   1573             BounceResult {
   1574                 tx_id: 1,
   1575                 tx_new: false,
   1576                 bounce_id: 2,
   1577                 bounce_new: true
   1578             }
   1579         );
   1580         // Idempotent registered
   1581         assert_eq!(
   1582             register_bounce_tx_in(
   1583                 &mut db,
   1584                 &TxIn {
   1585                     code: 13,
   1586                     amount: amount.clone(),
   1587                     subject: "subject".into(),
   1588                     debtor: payto.clone(),
   1589                     value_date: date,
   1590                     status: TxStatus::Completed
   1591                 },
   1592                 "good reason",
   1593                 &now
   1594             )
   1595             .await
   1596             .expect("bounce"),
   1597             BounceResult {
   1598                 tx_id: 1,
   1599                 tx_new: false,
   1600                 bounce_id: 2,
   1601                 bounce_new: false
   1602             }
   1603         );
   1604 
   1605         // Batch
   1606         assert_eq!(
   1607             db::pending_batch(&mut db, &now).await.unwrap(),
   1608             &[
   1609                 Initiated {
   1610                     id: 1,
   1611                     amount: amount.clone(),
   1612                     subject: "bounce: 12".into(),
   1613                     creditor: payto.clone()
   1614                 },
   1615                 Initiated {
   1616                     id: 2,
   1617                     amount,
   1618                     subject: "bounce: 13".into(),
   1619                     creditor: payto
   1620                 }
   1621             ]
   1622         );
   1623     }
   1624 
   1625     #[tokio::test]
   1626     async fn status() {
   1627         let (mut db, _) = setup().await;
   1628 
   1629         // Unknown transfer
   1630         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
   1631             .await
   1632             .unwrap();
   1633         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1634             .await
   1635             .unwrap();
   1636     }
   1637 
   1638     #[tokio::test]
   1639     async fn batch() {
   1640         let (mut db, pool) = setup().await;
   1641         let start = Timestamp::now();
   1642         let magnet_payto =
   1643             magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1644 
   1645         // Empty db
   1646         let pendings = db::pending_batch(&mut db, &start)
   1647             .await
   1648             .expect("pending_batch");
   1649         assert_eq!(pendings.len(), 0);
   1650 
   1651         // Some transfers
   1652         for i in 0..3 {
   1653             make_transfer(
   1654                 &pool,
   1655                 &db::Transfer {
   1656                     request_uid: HashCode::rand(),
   1657                     amount: decimal(format!("{}", i + 1)),
   1658                     exchange_base_url: url("https://exchange.test.com/"),
   1659                     metadata: None,
   1660                     wtid: ShortHashCode::rand(),
   1661                     creditor: magnet_payto.clone(),
   1662                 },
   1663                 &Timestamp::now(),
   1664             )
   1665             .await
   1666             .expect("transfer");
   1667         }
   1668         let pendings = db::pending_batch(&mut db, &start)
   1669             .await
   1670             .expect("pending_batch");
   1671         assert_eq!(pendings.len(), 3);
   1672 
   1673         // Max 100 txs in batch
   1674         for i in 0..100 {
   1675             make_transfer(
   1676                 &pool,
   1677                 &db::Transfer {
   1678                     request_uid: HashCode::rand(),
   1679                     amount: decimal(format!("{}", i + 1)),
   1680                     exchange_base_url: url("https://exchange.test.com/"),
   1681                     metadata: None,
   1682                     wtid: ShortHashCode::rand(),
   1683                     creditor: magnet_payto.clone(),
   1684                 },
   1685                 &Timestamp::now(),
   1686             )
   1687             .await
   1688             .expect("transfer");
   1689         }
   1690         let pendings = db::pending_batch(&mut db, &start)
   1691             .await
   1692             .expect("pending_batch");
   1693         assert_eq!(pendings.len(), 100);
   1694 
   1695         // Skip uploaded
   1696         for i in 0..=10 {
   1697             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1698                 .await
   1699                 .expect("status success");
   1700         }
   1701         let pendings = db::pending_batch(&mut db, &start)
   1702             .await
   1703             .expect("pending_batch");
   1704         assert_eq!(pendings.len(), 93);
   1705 
   1706         // Skip failed
   1707         for i in 0..=10 {
   1708             db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
   1709                 .await
   1710                 .expect("status failure");
   1711         }
   1712         let pendings = db::pending_batch(&mut db, &start)
   1713             .await
   1714             .expect("pending_batch");
   1715         assert_eq!(pendings.len(), 83);
   1716     }
   1717 }