taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (4095B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     io::ErrorKind,
     19     path::{Path, PathBuf},
     20     sync::Arc,
     21 };
     22 
     23 use sqlx::{
     24     Connection, Executor, PgConnection, PgPool, Row,
     25     postgres::{PgConnectOptions, PgPoolOptions, PgRow},
     26 };
     27 use taler_macros::EnumMeta;
     28 use tracing::{debug, info};
     29 
     30 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type, EnumMeta)]
     31 #[allow(non_camel_case_types)]
     32 #[sqlx(type_name = "incoming_type")]
     33 #[enum_meta(Str)]
     34 pub enum IncomingType {
     35     reserve,
     36     kyc,
     37     map,
     38 }
     39 
     40 /* ----- Pool ----- */
     41 
     42 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> {
     43     let init_sql = Arc::new(format!(
     44         "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';"
     45     ));
     46     let pool = PgPoolOptions::new()
     47         .after_connect(move |conn, _meta| {
     48             let init_sql = init_sql.clone();
     49             Box::pin(async move {
     50                 conn.execute(init_sql.as_str()).await?;
     51 
     52                 Ok(())
     53             })
     54         })
     55         .connect_with(cfg)
     56         .await?;
     57 
     58     // TODO check postgresql version ?
     59 
     60     Ok(pool)
     61 }
     62 
     63 /* ----- Migration----- */
     64 
     65 #[derive(Debug, thiserror::Error)]
     66 pub enum MigrationErr {
     67     #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())]
     68     Io(PathBuf, std::io::Error),
     69     #[error(transparent)]
     70     Sql(#[from] sqlx::Error),
     71 }
     72 
     73 pub async fn dbinit(
     74     conn: &mut PgConnection,
     75     sql_dir: &Path,
     76     prefix: &str,
     77     reset: bool,
     78 ) -> Result<(), MigrationErr> {
     79     let mut tx = conn.begin().await?;
     80 
     81     let exec_sql_file =
     82         async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> {
     83             let path = sql_dir.join(file);
     84             match std::fs::read_to_string(&path) {
     85                 Ok(content) => {
     86                     info!(target: "dbinit", "applying {action}");
     87                     sqlx::raw_sql(&content).execute(conn).await?;
     88                     Ok(())
     89                 }
     90                 Err(e) => Err(MigrationErr::Io(path, e)),
     91             }
     92         };
     93 
     94     if reset {
     95         info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy());
     96         exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?;
     97     }
     98 
     99     info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy());
    100 
    101     exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?;
    102 
    103     let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches")
    104         .try_map(|r: PgRow| r.try_get(0))
    105         .fetch_all(&mut *tx)
    106         .await?;
    107     for n in 1..9999 {
    108         let patch = format!("{prefix}-{n:0>4}");
    109         if applied.contains(&patch) {
    110             debug!(target: "dbinit", "patch {patch} already applied");
    111             continue;
    112         }
    113 
    114         if let Err(e) =
    115             exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await
    116         {
    117             if let MigrationErr::Io(path, e) = &e
    118                 && e.kind() == ErrorKind::NotFound
    119             {
    120                 debug!(
    121                     target: "dbinit",
    122                     "path '{}' doesn't exist anymore, stopping",
    123                     path.to_string_lossy()
    124                 );
    125                 break;
    126             }
    127             return Err(e);
    128         }
    129     }
    130 
    131     exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?;
    132 
    133     tx.commit().await?;
    134 
    135     Ok(())
    136 }