taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (12593B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2024, 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow};
     19 use taler_common::{
     20     api::{
     21         EddsaPublicKey,
     22         params::{History, Page},
     23         prepared::{RegistrationRequest, Unregistration},
     24         revenue::RevenueIncomingBankTransaction,
     25         wire::{
     26             IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     27             TransferResponse, TransferState, TransferStatus,
     28         },
     29     },
     30     db::IncomingType,
     31     types::{
     32         amount::{Amount, Currency},
     33         payto::PaytoURI,
     34     },
     35 };
     36 use tokio::sync::watch::{Receiver, Sender};
     37 
     38 use crate::{
     39     db::{BindHelper as _, TypeHelper as _, history, page},
     40     notification_listener, serialized,
     41     subject::fmt_out_subject,
     42 };
     43 
     44 pub async fn notification_listener(
     45     pool: PgPool,
     46     outgoing_channel: Sender<i64>,
     47     incoming_channel: Sender<i64>,
     48 ) {
     49     notification_listener!(&pool,
     50         "outgoing_tx" => (row_id: i64) {
     51             outgoing_channel.send_replace(row_id);
     52         },
     53         "incoming_tx" => (row_id: i64) {
     54             incoming_channel.send_replace(row_id);
     55         }
     56     )
     57 }
     58 
     59 pub enum TransferResult {
     60     Success(TransferResponse),
     61     RequestUidReuse,
     62     WtidReuse,
     63 }
     64 
     65 pub async fn transfer(db: &PgPool, req: &TransferRequest) -> sqlx::Result<TransferResult> {
     66     let subject = fmt_out_subject(&req.wtid, &req.exchange_base_url, req.metadata.as_deref());
     67     serialized!(
     68         sqlx::query(
     69             "
     70             SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at
     71             FROM taler_transfer($1,$2,$3,$4,$5,$6,$7,$8)
     72         ",
     73         )
     74         .bind(req.amount)
     75         .bind(req.exchange_base_url.as_str())
     76         .bind(&req.metadata)
     77         .bind(&subject)
     78         .bind(req.credit_account.raw())
     79         .bind(&req.request_uid)
     80         .bind(&req.wtid)
     81         .bind_timestamp(&Timestamp::now())
     82         .try_map(|r: PgRow| {
     83             Ok(if r.try_get_flag("out_request_uid_reuse")? {
     84                 TransferResult::RequestUidReuse
     85             } else if r.try_get_flag("out_wtid_reuse")? {
     86                 TransferResult::WtidReuse
     87             } else {
     88                 TransferResult::Success(TransferResponse {
     89                     row_id: r.try_get_u64("out_transfer_row_id")?,
     90                     timestamp: r.try_get_timestamp("out_created_at")?.into(),
     91                 })
     92             })
     93         })
     94         .fetch_one(db)
     95     )
     96 }
     97 
     98 pub async fn transfer_page(
     99     db: &PgPool,
    100     status: &Option<TransferState>,
    101     params: &Page,
    102     currency: &Currency,
    103 ) -> sqlx::Result<Vec<TransferListStatus>> {
    104     page(
    105         db,
    106         params,
    107         "transfer_id",
    108         || {
    109             let mut builder = QueryBuilder::new(
    110                 "
    111                     SELECT
    112                         transfer_id,
    113                         status,
    114                         amount,
    115                         credit_payto,
    116                         created_at
    117                     FROM transfer
    118                     JOIN tx_out USING (tx_out_id)
    119                     WHERE
    120                 ",
    121             );
    122             if let Some(status) = status {
    123                 builder.push(" status = ").push_bind(status).push(" AND ");
    124             }
    125             builder
    126         },
    127         |r: PgRow| {
    128             Ok(TransferListStatus {
    129                 row_id: r.try_get_u64("transfer_id")?,
    130                 status: r.try_get("status")?,
    131                 amount: r.try_get_amount("amount", currency)?,
    132                 credit_account: r.try_get_payto("credit_payto")?,
    133                 timestamp: r.try_get_timestamp("created_at")?.into(),
    134             })
    135         },
    136     )
    137     .await
    138 }
    139 
    140 pub async fn transfer_by_id(
    141     db: &PgPool,
    142     id: u64,
    143     currency: &Currency,
    144 ) -> sqlx::Result<Option<TransferStatus>> {
    145     serialized!(
    146         sqlx::query(
    147             "
    148             SELECT
    149                 status,
    150                 status_msg,
    151                 amount,
    152                 exchange_base_url,
    153                 metadata,
    154                 wtid,
    155                 credit_payto,
    156                 created_at
    157             FROM transfer
    158             JOIN tx_out USING (tx_out_id)
    159             WHERE transfer_id = $1
    160         ",
    161         )
    162         .bind(id as i64)
    163         .try_map(|r: PgRow| {
    164             Ok(TransferStatus {
    165                 status: r.try_get("status")?,
    166                 status_msg: r.try_get("status_msg")?,
    167                 amount: r.try_get_amount("amount", currency)?,
    168                 origin_exchange_url: r.try_get("exchange_base_url")?,
    169                 metadata: r.try_get("metadata")?,
    170                 wtid: r.try_get("wtid")?,
    171                 credit_account: r.try_get_payto("credit_payto")?,
    172                 timestamp: r.try_get_timestamp("created_at")?.into(),
    173             })
    174         })
    175         .fetch_optional(db)
    176     )
    177 }
    178 
    179 pub async fn outgoing_revenue(
    180     db: &PgPool,
    181     params: &History,
    182     currency: &Currency,
    183     listen: impl FnOnce() -> Receiver<i64>,
    184 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    185     history(
    186         db,
    187         "transfer_id",
    188         params,
    189         listen,
    190         || {
    191             QueryBuilder::new(
    192                 "
    193                 SELECT
    194                     transfer_id,
    195                     amount,
    196                     exchange_base_url,
    197                     metadata,
    198                     wtid,
    199                     credit_payto,
    200                     created_at
    201                 FROM transfer
    202                 JOIN tx_out USING (tx_out_id)
    203                 WHERE status = 'success' AND
    204             ",
    205             )
    206         },
    207         |r| {
    208             Ok(OutgoingBankTransaction {
    209                 amount: r.try_get_amount("amount", currency)?,
    210                 debit_fee: None,
    211                 wtid: r.try_get("wtid")?,
    212                 credit_account: r.try_get_payto("credit_payto")?,
    213                 row_id: r.try_get_u64("transfer_id")?,
    214                 date: r.try_get_timestamp("created_at")?.into(),
    215                 exchange_base_url: r.try_get_url("exchange_base_url")?,
    216                 metadata: r.try_get("metadata")?,
    217             })
    218         },
    219     )
    220     .await
    221 }
    222 
    223 pub enum AddIncomingResult {
    224     Success { id: u64, created_at: Timestamp },
    225     ReservePubReuse,
    226     UnknownMapping,
    227     MappingReuse,
    228 }
    229 
    230 pub async fn add_incoming(
    231     db: &PgPool,
    232     amount: &Amount,
    233     debit_account: &PaytoURI,
    234     subject: &str,
    235     timestamp: &Timestamp,
    236     ty: IncomingType,
    237     account_pub: &EddsaPublicKey,
    238 ) -> sqlx::Result<AddIncomingResult> {
    239     serialized!(
    240         sqlx::query(
    241             "SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_created_at
    242             FROM add_incoming($1,$2,$3,$4,$5,$6)",
    243         )
    244         .bind(amount)
    245         .bind(subject)
    246         .bind(debit_account.raw())
    247         .bind(ty)
    248         .bind(account_pub)
    249         .bind_timestamp(timestamp)
    250         .try_map(|r: PgRow| {
    251             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    252                 AddIncomingResult::ReservePubReuse
    253             } else if r.try_get_flag("out_mapping_reuse")? {
    254                 AddIncomingResult::MappingReuse
    255             } else if r.try_get_flag("out_unknown_mapping")? {
    256                 AddIncomingResult::UnknownMapping
    257             } else{
    258                 AddIncomingResult::Success {
    259                     id: r.try_get_u64("out_tx_row_id")?,
    260                     created_at: r.try_get_timestamp("out_created_at")?,
    261                 }
    262             })
    263         })
    264         .fetch_one(db)
    265     )
    266 }
    267 
    268 pub async fn incoming_history(
    269     db: &PgPool,
    270     params: &History,
    271     currency: &Currency,
    272     listen: impl FnOnce() -> Receiver<i64>,
    273 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    274     history(
    275         db,
    276         "tx_in_id",
    277         params,
    278         listen,
    279         || {
    280             QueryBuilder::new(
    281                 "
    282                  SELECT
    283                     type,
    284                     tx_in_id,
    285                     amount,
    286                     created_at,
    287                     debit_payto,
    288                     account_pub,
    289                     authorization_pub,
    290                     authorization_sig
    291                 FROM tx_in
    292                 JOIN taler_in USING (tx_in_id)
    293                 WHERE
    294             ",
    295             )
    296         },
    297         |r: PgRow| {
    298             Ok(match r.try_get("type")? {
    299                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    300                     row_id: r.try_get_u64("tx_in_id")?,
    301                     date: r.try_get_timestamp("created_at")?.into(),
    302                     amount: r.try_get_amount("amount", currency)?,
    303                     credit_fee: None,
    304                     debit_account: r.try_get_payto("debit_payto")?,
    305                     reserve_pub: r.try_get("account_pub")?,
    306                     authorization_pub: r.try_get("authorization_pub")?,
    307                     authorization_sig: r.try_get("authorization_sig")?,
    308                 },
    309                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    310                     row_id: r.try_get_u64("tx_in_id")?,
    311                     date: r.try_get_timestamp("created_at")?.into(),
    312                     amount: r.try_get_amount("amount", currency)?,
    313                     credit_fee: None,
    314                     debit_account: r.try_get_payto("debit_payto")?,
    315                     account_pub: r.try_get("account_pub")?,
    316                     authorization_pub: r.try_get("authorization_pub")?,
    317                     authorization_sig: r.try_get("authorization_sig")?,
    318                 },
    319                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    320             })
    321         },
    322     )
    323     .await
    324 }
    325 
    326 pub async fn revenue_history(
    327     db: &PgPool,
    328     params: &History,
    329     currency: &Currency,
    330     listen: impl FnOnce() -> Receiver<i64>,
    331 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    332     history(
    333         db,
    334         "tx_in_id",
    335         params,
    336         listen,
    337         || {
    338             QueryBuilder::new(
    339                 "
    340                  SELECT
    341                     tx_in_id,
    342                     amount,
    343                     created_at,
    344                     debit_payto,
    345                     subject
    346                 FROM tx_in
    347                 WHERE
    348             ",
    349             )
    350         },
    351         |r: PgRow| {
    352             Ok(RevenueIncomingBankTransaction {
    353                 row_id: r.try_get_u64("tx_in_id")?,
    354                 date: r.try_get_timestamp("created_at")?.into(),
    355                 amount: r.try_get_amount("amount", currency)?,
    356                 credit_fee: None,
    357                 debit_account: r.try_get_payto("debit_payto")?,
    358                 subject: r.try_get("subject")?,
    359             })
    360         },
    361     )
    362     .await
    363 }
    364 
    365 pub enum RegistrationResult {
    366     Success,
    367     ReservePubReuse,
    368 }
    369 
    370 pub async fn transfer_register(
    371     db: &PgPool,
    372     req: &RegistrationRequest,
    373 ) -> sqlx::Result<RegistrationResult> {
    374     let ty: IncomingType = req.r#type.into();
    375     serialized!(
    376         sqlx::query(
    377             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    378         )
    379         .bind(ty)
    380         .bind(&req.account_pub)
    381         .bind(&req.authorization_pub)
    382         .bind(&req.authorization_sig)
    383         .bind(req.recurrent)
    384         .bind_timestamp(&Timestamp::now())
    385         .try_map(|r: PgRow| {
    386             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    387                 RegistrationResult::ReservePubReuse
    388             } else {
    389                 RegistrationResult::Success
    390             })
    391         })
    392         .fetch_one(db)
    393     )
    394 }
    395 
    396 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    397     serialized!(
    398         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    399             .bind(&req.authorization_pub)
    400             .bind_timestamp(&Timestamp::now())
    401             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    402             .fetch_one(db)
    403     )
    404 }