taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (4760B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     io::ErrorKind,
     19     path::{Path, PathBuf},
     20     sync::Arc,
     21 };
     22 
     23 use sqlx::{
     24     Connection, Executor, PgConnection, PgPool, Postgres, Row, Transaction,
     25     postgres::{PgConnectOptions, PgPoolOptions, PgRow},
     26 };
     27 use taler_macros::EnumMeta;
     28 use tracing::{debug, info};
     29 
     30 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type, EnumMeta)]
     31 #[allow(non_camel_case_types)]
     32 #[sqlx(type_name = "incoming_type")]
     33 #[enum_meta(Str)]
     34 pub enum IncomingType {
     35     reserve,
     36     kyc,
     37     map,
     38 }
     39 
     40 /* ----- Pool ----- */
     41 
     42 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> {
     43     let init_sql = Arc::new(format!(
     44         "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';"
     45     ));
     46     let pool = PgPoolOptions::new()
     47         .after_connect(move |conn, _meta| {
     48             let init_sql = init_sql.clone();
     49             Box::pin(async move {
     50                 conn.execute(init_sql.as_str()).await?;
     51 
     52                 Ok(())
     53             })
     54         })
     55         .connect_with(cfg)
     56         .await?;
     57 
     58     // TODO check postgresql version ?
     59 
     60     Ok(pool)
     61 }
     62 
     63 /* ----- Migration----- */
     64 
     65 #[derive(Debug, thiserror::Error)]
     66 pub enum MigrationErr {
     67     #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())]
     68     Io(PathBuf, std::io::Error),
     69     #[error(transparent)]
     70     Sql(#[from] sqlx::Error),
     71 }
     72 
     73 pub async fn dbinit(
     74     conn: &mut PgConnection,
     75     sql_dir: &Path,
     76     prefix: &str,
     77     reset: bool,
     78 ) -> anyhow::Result<()> {
     79     dbinit_setup(conn, sql_dir, prefix, reset, async |_| Ok(())).await
     80 }
     81 
     82 pub async fn dbinit_setup(
     83     conn: &mut PgConnection,
     84     sql_dir: &Path,
     85     prefix: &str,
     86     reset: bool,
     87     setup: impl AsyncFn(&mut Transaction<Postgres>) -> anyhow::Result<()>,
     88 ) -> anyhow::Result<()> {
     89     let mut tx = conn.begin().await?;
     90 
     91     let exec_sql_file =
     92         async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> {
     93             let path = sql_dir.join(file);
     94             match std::fs::read_to_string(&path) {
     95                 Ok(content) => {
     96                     info!(target: "dbinit", "applying {action}");
     97                     sqlx::raw_sql(&content).execute(conn).await?;
     98                     Ok(())
     99                 }
    100                 Err(e) => Err(MigrationErr::Io(path, e)),
    101             }
    102         };
    103 
    104     if reset {
    105         info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy());
    106         exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?;
    107     }
    108 
    109     info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy());
    110 
    111     exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?;
    112 
    113     let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches")
    114         .try_map(|r: PgRow| r.try_get(0))
    115         .fetch_all(&mut *tx)
    116         .await?;
    117     for n in 1..9999 {
    118         let patch = format!("{prefix}-{n:0>4}");
    119         if applied.contains(&patch) {
    120             debug!(target: "dbinit", "patch {patch} already applied");
    121             continue;
    122         }
    123 
    124         if let Err(e) =
    125             exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await
    126         {
    127             if let MigrationErr::Io(path, e) = &e
    128                 && e.kind() == ErrorKind::NotFound
    129             {
    130                 debug!(
    131                     target: "dbinit",
    132                     "path '{}' doesn't exist anymore, stopping",
    133                     path.to_string_lossy()
    134                 );
    135                 break;
    136             }
    137             return Err(e.into());
    138         }
    139     }
    140 
    141     if let Err(e) = exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await
    142     {
    143         if let MigrationErr::Io(_, e) = &e
    144             && e.kind() == ErrorKind::NotFound
    145         {
    146             debug!(
    147                 target: "dbinit",
    148                 "no procedures.sql for the SQL collection: '{prefix}'"
    149             );
    150         } else {
    151             return Err(e.into());
    152         }
    153     }
    154 
    155     setup(&mut tx).await?;
    156 
    157     tx.commit().await?;
    158 
    159     Ok(())
    160 }