taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (12631B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2024, 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow};
     19 use taler_api::{
     20     db::{BindHelper, TypeHelper, history, page},
     21     serialized,
     22     subject::fmt_out_subject,
     23 };
     24 use taler_common::{
     25     api_common::{EddsaPublicKey, SafeU64},
     26     api_params::{History, Page},
     27     api_revenue::RevenueIncomingBankTransaction,
     28     api_transfer::{RegistrationRequest, Unregistration},
     29     api_wire::{
     30         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     31         TransferResponse, TransferState, TransferStatus,
     32     },
     33     db::IncomingType,
     34     types::{
     35         amount::{Amount, Currency},
     36         payto::PaytoURI,
     37     },
     38 };
     39 use tokio::sync::watch::{Receiver, Sender};
     40 
     41 pub async fn notification_listener(
     42     pool: PgPool,
     43     outgoing_channel: Sender<i64>,
     44     incoming_channel: Sender<i64>,
     45 ) -> sqlx::Result<()> {
     46     taler_api::notification::notification_listener!(&pool,
     47         "outgoing_tx" => (row_id: i64) {
     48             outgoing_channel.send_replace(row_id);
     49         },
     50         "incoming_tx" => (row_id: i64) {
     51             incoming_channel.send_replace(row_id);
     52         }
     53     )
     54 }
     55 
     56 pub enum TransferResult {
     57     Success(TransferResponse),
     58     RequestUidReuse,
     59     WtidReuse,
     60 }
     61 
     62 pub async fn transfer(db: &PgPool, req: &TransferRequest) -> sqlx::Result<TransferResult> {
     63     let subject = fmt_out_subject(&req.wtid, &req.exchange_base_url, req.metadata.as_deref());
     64     serialized!(
     65         sqlx::query(
     66             "
     67             SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at
     68             FROM taler_transfer($1,$2,$3,$4,$5,$6,$7,$8)
     69         ",
     70         )
     71         .bind(&req.amount)
     72         .bind(req.exchange_base_url.as_str())
     73         .bind(&req.metadata)
     74         .bind(&subject)
     75         .bind(req.credit_account.raw())
     76         .bind(&req.request_uid)
     77         .bind(&req.wtid)
     78         .bind_timestamp(&Timestamp::now())
     79         .try_map(|r: PgRow| {
     80             Ok(if r.try_get_flag("out_request_uid_reuse")? {
     81                 TransferResult::RequestUidReuse
     82             } else if r.try_get_flag("out_wtid_reuse")? {
     83                 TransferResult::WtidReuse
     84             } else {
     85                 TransferResult::Success(TransferResponse {
     86                     row_id: r.try_get_safeu64("out_transfer_row_id")?,
     87                     timestamp: r.try_get_timestamp("out_created_at")?.into(),
     88                 })
     89             })
     90         })
     91         .fetch_one(db)
     92     )
     93 }
     94 
     95 pub async fn transfer_page(
     96     db: &PgPool,
     97     status: &Option<TransferState>,
     98     params: &Page,
     99     currency: &Currency,
    100 ) -> sqlx::Result<Vec<TransferListStatus>> {
    101     page(
    102         db,
    103         "transfer_id",
    104         params,
    105         || {
    106             let mut builder = QueryBuilder::new(
    107                 "
    108                     SELECT
    109                         transfer_id,
    110                         status,
    111                         amount,
    112                         credit_payto,
    113                         created_at
    114                     FROM transfer 
    115                     JOIN tx_out USING (tx_out_id)
    116                     WHERE
    117                 ",
    118             );
    119             if let Some(status) = status {
    120                 builder.push(" status = ").push_bind(status).push(" AND ");
    121             }
    122             builder
    123         },
    124         |r: PgRow| {
    125             Ok(TransferListStatus {
    126                 row_id: r.try_get_safeu64("transfer_id")?,
    127                 status: r.try_get("status")?,
    128                 amount: r.try_get_amount("amount", currency)?,
    129                 credit_account: r.try_get_payto("credit_payto")?,
    130                 timestamp: r.try_get_timestamp("created_at")?.into(),
    131             })
    132         },
    133     )
    134     .await
    135 }
    136 
    137 pub async fn transfer_by_id(
    138     db: &PgPool,
    139     id: u64,
    140     currency: &Currency,
    141 ) -> sqlx::Result<Option<TransferStatus>> {
    142     serialized!(
    143         sqlx::query(
    144             "
    145             SELECT
    146                 status,
    147                 status_msg,
    148                 amount,
    149                 exchange_base_url,
    150                 metadata,
    151                 wtid,
    152                 credit_payto,
    153                 created_at
    154             FROM transfer
    155             JOIN tx_out USING (tx_out_id)
    156             WHERE transfer_id = $1
    157         ",
    158         )
    159         .bind(id as i64)
    160         .try_map(|r: PgRow| {
    161             Ok(TransferStatus {
    162                 status: r.try_get("status")?,
    163                 status_msg: r.try_get("status_msg")?,
    164                 amount: r.try_get_amount("amount", currency)?,
    165                 origin_exchange_url: r.try_get("exchange_base_url")?,
    166                 metadata: r.try_get("metadata")?,
    167                 wtid: r.try_get("wtid")?,
    168                 credit_account: r.try_get_payto("credit_payto")?,
    169                 timestamp: r.try_get_timestamp("created_at")?.into(),
    170             })
    171         })
    172         .fetch_optional(db)
    173     )
    174 }
    175 
    176 pub async fn outgoing_revenue(
    177     db: &PgPool,
    178     params: &History,
    179     currency: &Currency,
    180     listen: impl FnOnce() -> Receiver<i64>,
    181 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    182     history(
    183         db,
    184         "transfer_id",
    185         params,
    186         listen,
    187         || {
    188             QueryBuilder::new(
    189                 "
    190                 SELECT
    191                     transfer_id,
    192                     amount,
    193                     exchange_base_url,
    194                     metadata,
    195                     wtid,
    196                     credit_payto,
    197                     created_at
    198                 FROM transfer
    199                 JOIN tx_out USING (tx_out_id)
    200                 WHERE status = 'success' AND
    201             ",
    202             )
    203         },
    204         |r| {
    205             Ok(OutgoingBankTransaction {
    206                 amount: r.try_get_amount("amount", currency)?,
    207                 debit_fee: None,
    208                 wtid: r.try_get("wtid")?,
    209                 credit_account: r.try_get_payto("credit_payto")?,
    210                 row_id: r.try_get_safeu64("transfer_id")?,
    211                 date: r.try_get_timestamp("created_at")?.into(),
    212                 exchange_base_url: r.try_get_url("exchange_base_url")?,
    213                 metadata: r.try_get("metadata")?,
    214             })
    215         },
    216     )
    217     .await
    218 }
    219 
    220 pub enum AddIncomingResult {
    221     Success { id: SafeU64, created_at: Timestamp },
    222     ReservePubReuse,
    223     UnknownMapping,
    224     MappingReuse,
    225 }
    226 
    227 pub async fn add_incoming(
    228     db: &PgPool,
    229     amount: &Amount,
    230     debit_account: &PaytoURI,
    231     subject: &str,
    232     timestamp: &Timestamp,
    233     ty: IncomingType,
    234     account_pub: &EddsaPublicKey,
    235 ) -> sqlx::Result<AddIncomingResult> {
    236     serialized!(
    237         sqlx::query(
    238             "SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_created_at
    239             FROM add_incoming($1,$2,$3,$4,$5,$6)",
    240         )
    241         .bind(amount)
    242         .bind(subject)
    243         .bind(debit_account.raw())
    244         .bind(ty)
    245         .bind(account_pub)
    246         .bind_timestamp(timestamp)
    247         .try_map(|r: PgRow| {
    248             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    249                 AddIncomingResult::ReservePubReuse
    250             } else if r.try_get_flag("out_mapping_reuse")? {
    251                 AddIncomingResult::MappingReuse
    252             } else if r.try_get_flag("out_unknown_mapping")? {
    253                 AddIncomingResult::UnknownMapping
    254             } else{
    255                 AddIncomingResult::Success {
    256                     id: r.try_get_safeu64("out_tx_row_id")?,
    257                     created_at: r.try_get_timestamp("out_created_at")?,
    258                 }
    259             })
    260         })
    261         .fetch_one(db)
    262     )
    263 }
    264 
    265 pub async fn incoming_history(
    266     db: &PgPool,
    267     params: &History,
    268     currency: &Currency,
    269     listen: impl FnOnce() -> Receiver<i64>,
    270 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    271     history(
    272         db,
    273         "tx_in_id",
    274         params,
    275         listen,
    276         || {
    277             QueryBuilder::new(
    278                 "
    279                  SELECT
    280                     type,
    281                     tx_in_id,
    282                     amount,
    283                     created_at,
    284                     debit_payto,
    285                     account_pub,
    286                     authorization_pub,
    287                     authorization_sig
    288                 FROM tx_in
    289                 JOIN taler_in USING (tx_in_id)
    290                 WHERE
    291             ",
    292             )
    293         },
    294         |r: PgRow| {
    295             Ok(match r.try_get("type")? {
    296                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    297                     row_id: r.try_get_safeu64("tx_in_id")?,
    298                     date: r.try_get_timestamp("created_at")?.into(),
    299                     amount: r.try_get_amount("amount", currency)?,
    300                     credit_fee: None,
    301                     debit_account: r.try_get_payto("debit_payto")?,
    302                     reserve_pub: r.try_get("account_pub")?,
    303                     authorization_pub: r.try_get("authorization_pub")?,
    304                     authorization_sig: r.try_get("authorization_sig")?,
    305                 },
    306                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    307                     row_id: r.try_get_safeu64("tx_in_id")?,
    308                     date: r.try_get_timestamp("created_at")?.into(),
    309                     amount: r.try_get_amount("amount", currency)?,
    310                     credit_fee: None,
    311                     debit_account: r.try_get_payto("debit_payto")?,
    312                     account_pub: r.try_get("account_pub")?,
    313                     authorization_pub: r.try_get("authorization_pub")?,
    314                     authorization_sig: r.try_get("authorization_sig")?,
    315                 },
    316                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    317             })
    318         },
    319     )
    320     .await
    321 }
    322 
    323 pub async fn revenue_history(
    324     db: &PgPool,
    325     params: &History,
    326     currency: &Currency,
    327     listen: impl FnOnce() -> Receiver<i64>,
    328 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    329     history(
    330         db,
    331         "tx_in_id",
    332         params,
    333         listen,
    334         || {
    335             QueryBuilder::new(
    336                 "
    337                  SELECT
    338                     tx_in_id,
    339                     amount,
    340                     created_at,
    341                     debit_payto,
    342                     subject
    343                 FROM tx_in
    344                 WHERE
    345             ",
    346             )
    347         },
    348         |r: PgRow| {
    349             Ok(RevenueIncomingBankTransaction {
    350                 row_id: r.try_get_safeu64("tx_in_id")?,
    351                 date: r.try_get_timestamp("created_at")?.into(),
    352                 amount: r.try_get_amount("amount", currency)?,
    353                 credit_fee: None,
    354                 debit_account: r.try_get_payto("debit_payto")?,
    355                 subject: r.try_get("subject")?,
    356             })
    357         },
    358     )
    359     .await
    360 }
    361 
    362 pub enum RegistrationResult {
    363     Success,
    364     ReservePubReuse,
    365 }
    366 
    367 pub async fn transfer_register(
    368     db: &PgPool,
    369     req: &RegistrationRequest,
    370 ) -> sqlx::Result<RegistrationResult> {
    371     let ty: IncomingType = req.r#type.into();
    372     serialized!(
    373         sqlx::query(
    374             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    375         )
    376         .bind(ty)
    377         .bind(&req.account_pub)
    378         .bind(&req.authorization_pub)
    379         .bind(&req.authorization_sig)
    380         .bind(req.recurrent)
    381         .bind_timestamp(&Timestamp::now())
    382         .try_map(|r: PgRow| {
    383             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    384                 RegistrationResult::ReservePubReuse
    385             } else {
    386                 RegistrationResult::Success
    387             })
    388         })
    389         .fetch_one(db)
    390     )
    391 }
    392 
    393 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    394     serialized!(
    395         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    396             .bind(&req.authorization_pub)
    397             .bind_timestamp(&Timestamp::now())
    398             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    399             .fetch_one(db)
    400     )
    401 }