db.rs (4061B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{ 18 io::ErrorKind, 19 path::{Path, PathBuf}, 20 sync::Arc, 21 }; 22 23 use sqlx::{ 24 Connection, Executor, PgPool, Row, 25 postgres::{PgConnectOptions, PgPoolOptions}, 26 }; 27 use sqlx::{PgConnection, postgres::PgRow}; 28 use tracing::{debug, info}; 29 30 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 31 #[allow(non_camel_case_types)] 32 #[sqlx(type_name = "incoming_type")] 33 pub enum IncomingType { 34 reserve, 35 kyc, 36 map, 37 } 38 39 /* ----- Pool ----- */ 40 41 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> { 42 let init_sql = Arc::new(format!( 43 "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';" 44 )); 45 let pool = PgPoolOptions::new() 46 .after_connect(move |conn, _meta| { 47 let init_sql = init_sql.clone(); 48 Box::pin(async move { 49 conn.execute(init_sql.as_str()).await?; 50 51 Ok(()) 52 }) 53 }) 54 .connect_with(cfg) 55 .await?; 56 57 // TODO check postgresql version ? 58 59 Ok(pool) 60 } 61 62 /* ----- Migration----- */ 63 64 #[derive(Debug, thiserror::Error)] 65 pub enum MigrationErr { 66 #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())] 67 Io(PathBuf, std::io::Error), 68 #[error(transparent)] 69 Sql(#[from] sqlx::Error), 70 } 71 72 pub async fn dbinit( 73 conn: &mut PgConnection, 74 sql_dir: &Path, 75 prefix: &str, 76 reset: bool, 77 ) -> Result<(), MigrationErr> { 78 let mut tx = conn.begin().await?; 79 80 let exec_sql_file = 81 async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> { 82 let path = sql_dir.join(file); 83 match std::fs::read_to_string(&path) { 84 Ok(content) => { 85 info!(target: "dbinit", "applying {action}"); 86 sqlx::raw_sql(&content).execute(conn).await?; 87 Ok(()) 88 } 89 Err(e) => Err(MigrationErr::Io(path, e)), 90 } 91 }; 92 93 if reset { 94 info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy()); 95 exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?; 96 } 97 98 info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy()); 99 100 exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?; 101 102 let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches") 103 .try_map(|r: PgRow| r.try_get(0)) 104 .fetch_all(&mut *tx) 105 .await?; 106 for n in 1..9999 { 107 let patch = format!("{prefix}-{n:0>4}"); 108 if applied.contains(&patch) { 109 debug!(target: "dbinit", "patch {patch} already applied"); 110 continue; 111 } 112 113 if let Err(e) = 114 exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await 115 { 116 if let MigrationErr::Io(path, e) = &e 117 && e.kind() == ErrorKind::NotFound 118 { 119 debug!( 120 target: "dbinit", 121 "path '{}' doesn't exist anymore, stopping", 122 path.to_string_lossy() 123 ); 124 break; 125 } 126 return Err(e); 127 } 128 } 129 130 exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?; 131 132 tx.commit().await?; 133 134 Ok(()) 135 }