taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (12634B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2024, 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow};
     19 use taler_common::{
     20     api_common::{EddsaPublicKey, SafeU64},
     21     api_params::{History, Page},
     22     api_revenue::RevenueIncomingBankTransaction,
     23     api_transfer::{RegistrationRequest, Unregistration},
     24     api_wire::{
     25         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     26         TransferResponse, TransferState, TransferStatus,
     27     },
     28     db::IncomingType,
     29     types::{
     30         amount::{Amount, Currency},
     31         payto::PaytoURI,
     32     },
     33 };
     34 use tokio::sync::watch::{Receiver, Sender};
     35 
     36 use crate::{
     37     db::{BindHelper as _, TypeHelper as _, history, page},
     38     notification_listener, serialized,
     39     subject::fmt_out_subject,
     40 };
     41 
     42 pub async fn notification_listener(
     43     pool: PgPool,
     44     outgoing_channel: Sender<i64>,
     45     incoming_channel: Sender<i64>,
     46 ) -> sqlx::Result<()> {
     47     notification_listener!(&pool,
     48         "outgoing_tx" => (row_id: i64) {
     49             outgoing_channel.send_replace(row_id);
     50         },
     51         "incoming_tx" => (row_id: i64) {
     52             incoming_channel.send_replace(row_id);
     53         }
     54     )
     55 }
     56 
     57 pub enum TransferResult {
     58     Success(TransferResponse),
     59     RequestUidReuse,
     60     WtidReuse,
     61 }
     62 
     63 pub async fn transfer(db: &PgPool, req: &TransferRequest) -> sqlx::Result<TransferResult> {
     64     let subject = fmt_out_subject(&req.wtid, &req.exchange_base_url, req.metadata.as_deref());
     65     serialized!(
     66         sqlx::query(
     67             "
     68             SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at
     69             FROM taler_transfer($1,$2,$3,$4,$5,$6,$7,$8)
     70         ",
     71         )
     72         .bind(req.amount)
     73         .bind(req.exchange_base_url.as_str())
     74         .bind(&req.metadata)
     75         .bind(&subject)
     76         .bind(req.credit_account.raw())
     77         .bind(&req.request_uid)
     78         .bind(&req.wtid)
     79         .bind_timestamp(&Timestamp::now())
     80         .try_map(|r: PgRow| {
     81             Ok(if r.try_get_flag("out_request_uid_reuse")? {
     82                 TransferResult::RequestUidReuse
     83             } else if r.try_get_flag("out_wtid_reuse")? {
     84                 TransferResult::WtidReuse
     85             } else {
     86                 TransferResult::Success(TransferResponse {
     87                     row_id: r.try_get_safeu64("out_transfer_row_id")?,
     88                     timestamp: r.try_get_timestamp("out_created_at")?.into(),
     89                 })
     90             })
     91         })
     92         .fetch_one(db)
     93     )
     94 }
     95 
     96 pub async fn transfer_page(
     97     db: &PgPool,
     98     status: &Option<TransferState>,
     99     params: &Page,
    100     currency: &Currency,
    101 ) -> sqlx::Result<Vec<TransferListStatus>> {
    102     page(
    103         db,
    104         params,
    105         "transfer_id",
    106         || {
    107             let mut builder = QueryBuilder::new(
    108                 "
    109                     SELECT
    110                         transfer_id,
    111                         status,
    112                         amount,
    113                         credit_payto,
    114                         created_at
    115                     FROM transfer
    116                     JOIN tx_out USING (tx_out_id)
    117                     WHERE
    118                 ",
    119             );
    120             if let Some(status) = status {
    121                 builder.push(" status = ").push_bind(status).push(" AND ");
    122             }
    123             builder
    124         },
    125         |r: PgRow| {
    126             Ok(TransferListStatus {
    127                 row_id: r.try_get_safeu64("transfer_id")?,
    128                 status: r.try_get("status")?,
    129                 amount: r.try_get_amount("amount", currency)?,
    130                 credit_account: r.try_get_payto("credit_payto")?,
    131                 timestamp: r.try_get_timestamp("created_at")?.into(),
    132             })
    133         },
    134     )
    135     .await
    136 }
    137 
    138 pub async fn transfer_by_id(
    139     db: &PgPool,
    140     id: u64,
    141     currency: &Currency,
    142 ) -> sqlx::Result<Option<TransferStatus>> {
    143     serialized!(
    144         sqlx::query(
    145             "
    146             SELECT
    147                 status,
    148                 status_msg,
    149                 amount,
    150                 exchange_base_url,
    151                 metadata,
    152                 wtid,
    153                 credit_payto,
    154                 created_at
    155             FROM transfer
    156             JOIN tx_out USING (tx_out_id)
    157             WHERE transfer_id = $1
    158         ",
    159         )
    160         .bind(id as i64)
    161         .try_map(|r: PgRow| {
    162             Ok(TransferStatus {
    163                 status: r.try_get("status")?,
    164                 status_msg: r.try_get("status_msg")?,
    165                 amount: r.try_get_amount("amount", currency)?,
    166                 origin_exchange_url: r.try_get("exchange_base_url")?,
    167                 metadata: r.try_get("metadata")?,
    168                 wtid: r.try_get("wtid")?,
    169                 credit_account: r.try_get_payto("credit_payto")?,
    170                 timestamp: r.try_get_timestamp("created_at")?.into(),
    171             })
    172         })
    173         .fetch_optional(db)
    174     )
    175 }
    176 
    177 pub async fn outgoing_revenue(
    178     db: &PgPool,
    179     params: &History,
    180     currency: &Currency,
    181     listen: impl FnOnce() -> Receiver<i64>,
    182 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    183     history(
    184         db,
    185         "transfer_id",
    186         params,
    187         listen,
    188         || {
    189             QueryBuilder::new(
    190                 "
    191                 SELECT
    192                     transfer_id,
    193                     amount,
    194                     exchange_base_url,
    195                     metadata,
    196                     wtid,
    197                     credit_payto,
    198                     created_at
    199                 FROM transfer
    200                 JOIN tx_out USING (tx_out_id)
    201                 WHERE status = 'success' AND
    202             ",
    203             )
    204         },
    205         |r| {
    206             Ok(OutgoingBankTransaction {
    207                 amount: r.try_get_amount("amount", currency)?,
    208                 debit_fee: None,
    209                 wtid: r.try_get("wtid")?,
    210                 credit_account: r.try_get_payto("credit_payto")?,
    211                 row_id: r.try_get_safeu64("transfer_id")?,
    212                 date: r.try_get_timestamp("created_at")?.into(),
    213                 exchange_base_url: r.try_get_url("exchange_base_url")?,
    214                 metadata: r.try_get("metadata")?,
    215             })
    216         },
    217     )
    218     .await
    219 }
    220 
    221 pub enum AddIncomingResult {
    222     Success { id: SafeU64, created_at: Timestamp },
    223     ReservePubReuse,
    224     UnknownMapping,
    225     MappingReuse,
    226 }
    227 
    228 pub async fn add_incoming(
    229     db: &PgPool,
    230     amount: &Amount,
    231     debit_account: &PaytoURI,
    232     subject: &str,
    233     timestamp: &Timestamp,
    234     ty: IncomingType,
    235     account_pub: &EddsaPublicKey,
    236 ) -> sqlx::Result<AddIncomingResult> {
    237     serialized!(
    238         sqlx::query(
    239             "SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_created_at
    240             FROM add_incoming($1,$2,$3,$4,$5,$6)",
    241         )
    242         .bind(amount)
    243         .bind(subject)
    244         .bind(debit_account.raw())
    245         .bind(ty)
    246         .bind(account_pub)
    247         .bind_timestamp(timestamp)
    248         .try_map(|r: PgRow| {
    249             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    250                 AddIncomingResult::ReservePubReuse
    251             } else if r.try_get_flag("out_mapping_reuse")? {
    252                 AddIncomingResult::MappingReuse
    253             } else if r.try_get_flag("out_unknown_mapping")? {
    254                 AddIncomingResult::UnknownMapping
    255             } else{
    256                 AddIncomingResult::Success {
    257                     id: r.try_get_safeu64("out_tx_row_id")?,
    258                     created_at: r.try_get_timestamp("out_created_at")?,
    259                 }
    260             })
    261         })
    262         .fetch_one(db)
    263     )
    264 }
    265 
    266 pub async fn incoming_history(
    267     db: &PgPool,
    268     params: &History,
    269     currency: &Currency,
    270     listen: impl FnOnce() -> Receiver<i64>,
    271 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    272     history(
    273         db,
    274         "tx_in_id",
    275         params,
    276         listen,
    277         || {
    278             QueryBuilder::new(
    279                 "
    280                  SELECT
    281                     type,
    282                     tx_in_id,
    283                     amount,
    284                     created_at,
    285                     debit_payto,
    286                     account_pub,
    287                     authorization_pub,
    288                     authorization_sig
    289                 FROM tx_in
    290                 JOIN taler_in USING (tx_in_id)
    291                 WHERE
    292             ",
    293             )
    294         },
    295         |r: PgRow| {
    296             Ok(match r.try_get("type")? {
    297                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    298                     row_id: r.try_get_safeu64("tx_in_id")?,
    299                     date: r.try_get_timestamp("created_at")?.into(),
    300                     amount: r.try_get_amount("amount", currency)?,
    301                     credit_fee: None,
    302                     debit_account: r.try_get_payto("debit_payto")?,
    303                     reserve_pub: r.try_get("account_pub")?,
    304                     authorization_pub: r.try_get("authorization_pub")?,
    305                     authorization_sig: r.try_get("authorization_sig")?,
    306                 },
    307                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    308                     row_id: r.try_get_safeu64("tx_in_id")?,
    309                     date: r.try_get_timestamp("created_at")?.into(),
    310                     amount: r.try_get_amount("amount", currency)?,
    311                     credit_fee: None,
    312                     debit_account: r.try_get_payto("debit_payto")?,
    313                     account_pub: r.try_get("account_pub")?,
    314                     authorization_pub: r.try_get("authorization_pub")?,
    315                     authorization_sig: r.try_get("authorization_sig")?,
    316                 },
    317                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    318             })
    319         },
    320     )
    321     .await
    322 }
    323 
    324 pub async fn revenue_history(
    325     db: &PgPool,
    326     params: &History,
    327     currency: &Currency,
    328     listen: impl FnOnce() -> Receiver<i64>,
    329 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    330     history(
    331         db,
    332         "tx_in_id",
    333         params,
    334         listen,
    335         || {
    336             QueryBuilder::new(
    337                 "
    338                  SELECT
    339                     tx_in_id,
    340                     amount,
    341                     created_at,
    342                     debit_payto,
    343                     subject
    344                 FROM tx_in
    345                 WHERE
    346             ",
    347             )
    348         },
    349         |r: PgRow| {
    350             Ok(RevenueIncomingBankTransaction {
    351                 row_id: r.try_get_safeu64("tx_in_id")?,
    352                 date: r.try_get_timestamp("created_at")?.into(),
    353                 amount: r.try_get_amount("amount", currency)?,
    354                 credit_fee: None,
    355                 debit_account: r.try_get_payto("debit_payto")?,
    356                 subject: r.try_get("subject")?,
    357             })
    358         },
    359     )
    360     .await
    361 }
    362 
    363 pub enum RegistrationResult {
    364     Success,
    365     ReservePubReuse,
    366 }
    367 
    368 pub async fn transfer_register(
    369     db: &PgPool,
    370     req: &RegistrationRequest,
    371 ) -> sqlx::Result<RegistrationResult> {
    372     let ty: IncomingType = req.r#type.into();
    373     serialized!(
    374         sqlx::query(
    375             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    376         )
    377         .bind(ty)
    378         .bind(&req.account_pub)
    379         .bind(&req.authorization_pub)
    380         .bind(&req.authorization_sig)
    381         .bind(req.recurrent)
    382         .bind_timestamp(&Timestamp::now())
    383         .try_map(|r: PgRow| {
    384             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    385                 RegistrationResult::ReservePubReuse
    386             } else {
    387                 RegistrationResult::Success
    388             })
    389         })
    390         .fetch_one(db)
    391     )
    392 }
    393 
    394 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    395     serialized!(
    396         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    397             .bind(&req.authorization_pub)
    398             .bind_timestamp(&Timestamp::now())
    399             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    400             .fetch_one(db)
    401     )
    402 }