taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (4061B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     io::ErrorKind,
     19     path::{Path, PathBuf},
     20     sync::Arc,
     21 };
     22 
     23 use sqlx::{
     24     Connection, Executor, PgPool, Row,
     25     postgres::{PgConnectOptions, PgPoolOptions},
     26 };
     27 use sqlx::{PgConnection, postgres::PgRow};
     28 use tracing::{debug, info};
     29 
     30 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
     31 #[allow(non_camel_case_types)]
     32 #[sqlx(type_name = "incoming_type")]
     33 pub enum IncomingType {
     34     reserve,
     35     kyc,
     36     map,
     37 }
     38 
     39 /* ----- Pool ----- */
     40 
     41 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> {
     42     let init_sql = Arc::new(format!(
     43         "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';"
     44     ));
     45     let pool = PgPoolOptions::new()
     46         .after_connect(move |conn, _meta| {
     47             let init_sql = init_sql.clone();
     48             Box::pin(async move {
     49                 conn.execute(init_sql.as_str()).await?;
     50 
     51                 Ok(())
     52             })
     53         })
     54         .connect_with(cfg)
     55         .await?;
     56 
     57     // TODO check postgresql version ?
     58 
     59     Ok(pool)
     60 }
     61 
     62 /* ----- Migration----- */
     63 
     64 #[derive(Debug, thiserror::Error)]
     65 pub enum MigrationErr {
     66     #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())]
     67     Io(PathBuf, std::io::Error),
     68     #[error(transparent)]
     69     Sql(#[from] sqlx::Error),
     70 }
     71 
     72 pub async fn dbinit(
     73     conn: &mut PgConnection,
     74     sql_dir: &Path,
     75     prefix: &str,
     76     reset: bool,
     77 ) -> Result<(), MigrationErr> {
     78     let mut tx = conn.begin().await?;
     79 
     80     let exec_sql_file =
     81         async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> {
     82             let path = sql_dir.join(file);
     83             match std::fs::read_to_string(&path) {
     84                 Ok(content) => {
     85                     info!(target: "dbinit", "applying {action}");
     86                     sqlx::raw_sql(&content).execute(conn).await?;
     87                     Ok(())
     88                 }
     89                 Err(e) => Err(MigrationErr::Io(path, e)),
     90             }
     91         };
     92 
     93     if reset {
     94         info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy());
     95         exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?;
     96     }
     97 
     98     info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy());
     99 
    100     exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?;
    101 
    102     let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches")
    103         .try_map(|r: PgRow| r.try_get(0))
    104         .fetch_all(&mut *tx)
    105         .await?;
    106     for n in 1..9999 {
    107         let patch = format!("{prefix}-{n:0>4}");
    108         if applied.contains(&patch) {
    109             debug!(target: "dbinit", "patch {patch} already applied");
    110             continue;
    111         }
    112 
    113         if let Err(e) =
    114             exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await
    115         {
    116             if let MigrationErr::Io(path, e) = &e
    117                 && e.kind() == ErrorKind::NotFound
    118             {
    119                 debug!(
    120                     target: "dbinit",
    121                     "path '{}' doesn't exist anymore, stopping",
    122                     path.to_string_lossy()
    123                 );
    124                 break;
    125             }
    126             return Err(e);
    127         }
    128     }
    129 
    130     exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?;
    131 
    132     tx.commit().await?;
    133 
    134     Ok(())
    135 }