db.rs (4760B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{ 18 io::ErrorKind, 19 path::{Path, PathBuf}, 20 sync::Arc, 21 }; 22 23 use sqlx::{ 24 Connection, Executor, PgConnection, PgPool, Postgres, Row, Transaction, 25 postgres::{PgConnectOptions, PgPoolOptions, PgRow}, 26 }; 27 use taler_macros::EnumMeta; 28 use tracing::{debug, info}; 29 30 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type, EnumMeta)] 31 #[allow(non_camel_case_types)] 32 #[sqlx(type_name = "incoming_type")] 33 #[enum_meta(Str)] 34 pub enum IncomingType { 35 reserve, 36 kyc, 37 map, 38 } 39 40 /* ----- Pool ----- */ 41 42 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> { 43 let init_sql = Arc::new(format!( 44 "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';" 45 )); 46 let pool = PgPoolOptions::new() 47 .after_connect(move |conn, _meta| { 48 let init_sql = init_sql.clone(); 49 Box::pin(async move { 50 conn.execute(init_sql.as_str()).await?; 51 52 Ok(()) 53 }) 54 }) 55 .connect_with(cfg) 56 .await?; 57 58 // TODO check postgresql version ? 59 60 Ok(pool) 61 } 62 63 /* ----- Migration----- */ 64 65 #[derive(Debug, thiserror::Error)] 66 pub enum MigrationErr { 67 #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())] 68 Io(PathBuf, std::io::Error), 69 #[error(transparent)] 70 Sql(#[from] sqlx::Error), 71 } 72 73 pub async fn dbinit( 74 conn: &mut PgConnection, 75 sql_dir: &Path, 76 prefix: &str, 77 reset: bool, 78 ) -> anyhow::Result<()> { 79 dbinit_setup(conn, sql_dir, prefix, reset, async |_| Ok(())).await 80 } 81 82 pub async fn dbinit_setup( 83 conn: &mut PgConnection, 84 sql_dir: &Path, 85 prefix: &str, 86 reset: bool, 87 setup: impl AsyncFn(&mut Transaction<Postgres>) -> anyhow::Result<()>, 88 ) -> anyhow::Result<()> { 89 let mut tx = conn.begin().await?; 90 91 let exec_sql_file = 92 async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> { 93 let path = sql_dir.join(file); 94 match std::fs::read_to_string(&path) { 95 Ok(content) => { 96 info!(target: "dbinit", "applying {action}"); 97 sqlx::raw_sql(&content).execute(conn).await?; 98 Ok(()) 99 } 100 Err(e) => Err(MigrationErr::Io(path, e)), 101 } 102 }; 103 104 if reset { 105 info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy()); 106 exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?; 107 } 108 109 info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy()); 110 111 exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?; 112 113 let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches") 114 .try_map(|r: PgRow| r.try_get(0)) 115 .fetch_all(&mut *tx) 116 .await?; 117 for n in 1..9999 { 118 let patch = format!("{prefix}-{n:0>4}"); 119 if applied.contains(&patch) { 120 debug!(target: "dbinit", "patch {patch} already applied"); 121 continue; 122 } 123 124 if let Err(e) = 125 exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await 126 { 127 if let MigrationErr::Io(path, e) = &e 128 && e.kind() == ErrorKind::NotFound 129 { 130 debug!( 131 target: "dbinit", 132 "path '{}' doesn't exist anymore, stopping", 133 path.to_string_lossy() 134 ); 135 break; 136 } 137 return Err(e.into()); 138 } 139 } 140 141 if let Err(e) = exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await 142 { 143 if let MigrationErr::Io(_, e) = &e 144 && e.kind() == ErrorKind::NotFound 145 { 146 debug!( 147 target: "dbinit", 148 "no procedures.sql for the SQL collection: '{prefix}'" 149 ); 150 } else { 151 return Err(e.into()); 152 } 153 } 154 155 setup(&mut tx).await?; 156 157 tx.commit().await?; 158 159 Ok(()) 160 }