db.rs (4095B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{ 18 io::ErrorKind, 19 path::{Path, PathBuf}, 20 sync::Arc, 21 }; 22 23 use sqlx::{ 24 Connection, Executor, PgConnection, PgPool, Row, 25 postgres::{PgConnectOptions, PgPoolOptions, PgRow}, 26 }; 27 use taler_macros::EnumMeta; 28 use tracing::{debug, info}; 29 30 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type, EnumMeta)] 31 #[allow(non_camel_case_types)] 32 #[sqlx(type_name = "incoming_type")] 33 #[enum_meta(Str)] 34 pub enum IncomingType { 35 reserve, 36 kyc, 37 map, 38 } 39 40 /* ----- Pool ----- */ 41 42 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> { 43 let init_sql = Arc::new(format!( 44 "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';" 45 )); 46 let pool = PgPoolOptions::new() 47 .after_connect(move |conn, _meta| { 48 let init_sql = init_sql.clone(); 49 Box::pin(async move { 50 conn.execute(init_sql.as_str()).await?; 51 52 Ok(()) 53 }) 54 }) 55 .connect_with(cfg) 56 .await?; 57 58 // TODO check postgresql version ? 59 60 Ok(pool) 61 } 62 63 /* ----- Migration----- */ 64 65 #[derive(Debug, thiserror::Error)] 66 pub enum MigrationErr { 67 #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())] 68 Io(PathBuf, std::io::Error), 69 #[error(transparent)] 70 Sql(#[from] sqlx::Error), 71 } 72 73 pub async fn dbinit( 74 conn: &mut PgConnection, 75 sql_dir: &Path, 76 prefix: &str, 77 reset: bool, 78 ) -> Result<(), MigrationErr> { 79 let mut tx = conn.begin().await?; 80 81 let exec_sql_file = 82 async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> { 83 let path = sql_dir.join(file); 84 match std::fs::read_to_string(&path) { 85 Ok(content) => { 86 info!(target: "dbinit", "applying {action}"); 87 sqlx::raw_sql(&content).execute(conn).await?; 88 Ok(()) 89 } 90 Err(e) => Err(MigrationErr::Io(path, e)), 91 } 92 }; 93 94 if reset { 95 info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy()); 96 exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?; 97 } 98 99 info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy()); 100 101 exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?; 102 103 let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches") 104 .try_map(|r: PgRow| r.try_get(0)) 105 .fetch_all(&mut *tx) 106 .await?; 107 for n in 1..9999 { 108 let patch = format!("{prefix}-{n:0>4}"); 109 if applied.contains(&patch) { 110 debug!(target: "dbinit", "patch {patch} already applied"); 111 continue; 112 } 113 114 if let Err(e) = 115 exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await 116 { 117 if let MigrationErr::Io(path, e) = &e 118 && e.kind() == ErrorKind::NotFound 119 { 120 debug!( 121 target: "dbinit", 122 "path '{}' doesn't exist anymore, stopping", 123 path.to_string_lossy() 124 ); 125 break; 126 } 127 return Err(e); 128 } 129 } 130 131 exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?; 132 133 tx.commit().await?; 134 135 Ok(()) 136 }