use rusqlite::Connection; use sqlite_vec::sqlite3_vec_init; use std::fs; use std::path::Path; use tracing::{debug, info}; use super::error::{LoreError, Result}; pub const LATEST_SCHEMA_VERSION: i32 = MIGRATIONS.len() as i32; const MIGRATIONS: &[(&str, &str)] = &[ ("001", include_str!("../../migrations/001_initial.sql")), ("002", include_str!("../../migrations/002_issues.sql")), ("003", include_str!("../../migrations/003_indexes.sql")), ( "004", include_str!("../../migrations/004_discussions_payload.sql"), ), ( "005", include_str!("../../migrations/005_assignees_milestone_duedate.sql"), ), ( "006", include_str!("../../migrations/006_merge_requests.sql"), ), ("007", include_str!("../../migrations/007_documents.sql")), ("008", include_str!("../../migrations/008_fts5.sql")), ("009", include_str!("../../migrations/009_embeddings.sql")), ("010", include_str!("../../migrations/010_chunk_config.sql")), ( "011", include_str!("../../migrations/011_resource_events.sql"), ), ( "012", include_str!("../../migrations/012_nullable_label_milestone.sql"), ), ( "013", include_str!("../../migrations/013_resource_event_watermarks.sql"), ), ( "014", include_str!("../../migrations/014_sync_runs_enrichment.sql"), ), ( "015", include_str!("../../migrations/015_commit_shas_and_closes_watermark.sql"), ), ( "016", include_str!("../../migrations/016_mr_file_changes.sql"), ), ("017", include_str!("../../migrations/017_who_indexes.sql")), ( "018", include_str!("../../migrations/018_fix_assignees_composite_index.sql"), ), ( "019", include_str!("../../migrations/019_list_performance.sql"), ), ( "020", include_str!("../../migrations/020_mr_diffs_watermark.sql"), ), ( "021", include_str!("../../migrations/021_work_item_status.sql"), ), ( "023", include_str!("../../migrations/023_issue_detail_fields.sql"), ), ]; pub fn create_connection(db_path: &Path) -> Result { // SAFETY: `sqlite3_vec_init` is an extern "C" function provided by the sqlite-vec // crate with the exact signature expected by `sqlite3_auto_extension`. The transmute // converts the concrete function pointer to the `Option` type // that the FFI expects. This is safe because: // 1. The function is a C-ABI init callback with a stable signature. // 2. SQLite calls it once per new connection, matching sqlite-vec's contract. // 3. `sqlite3_auto_extension` is idempotent for the same function pointer. #[allow(clippy::missing_transmute_annotations)] unsafe { rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute( sqlite3_vec_init as *const (), ))); } if let Some(parent) = db_path.parent() { fs::create_dir_all(parent)?; } let conn = Connection::open(db_path)?; conn.pragma_update(None, "journal_mode", "WAL")?; conn.pragma_update(None, "synchronous", "NORMAL")?; conn.pragma_update(None, "foreign_keys", "ON")?; conn.pragma_update(None, "busy_timeout", 5000)?; conn.pragma_update(None, "temp_store", "MEMORY")?; conn.pragma_update(None, "cache_size", -64000)?; conn.pragma_update(None, "mmap_size", 268_435_456)?; debug!(db_path = %db_path.display(), "Database connection created"); Ok(conn) } pub fn run_migrations(conn: &Connection) -> Result<()> { let has_version_table: bool = conn .query_row( "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'", [], |row| row.get(0), ) .unwrap_or(false); let current_version: i32 = if has_version_table { conn.query_row( "SELECT COALESCE(MAX(version), 0) FROM schema_version", [], |row| row.get(0), ) .unwrap_or(0) } else { 0 }; for (version_str, sql) in MIGRATIONS { let version: i32 = version_str.parse().expect("Invalid migration version"); if version <= current_version { debug!(version, "Migration already applied"); continue; } let savepoint_name = format!("migration_{}", version); conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name)) .map_err(|e| LoreError::MigrationFailed { version, message: format!("Failed to create savepoint: {}", e), source: Some(e), })?; match conn.execute_batch(sql) { Ok(()) => { // Framework-managed version bookkeeping: ensures the version is // always recorded even if a migration .sql omits the INSERT. // Uses OR REPLACE so legacy migrations that self-register are harmless. conn.execute( "INSERT OR REPLACE INTO schema_version (version, applied_at, description) \ VALUES (?1, strftime('%s', 'now') * 1000, ?2)", rusqlite::params![version, version_str], ) .map_err(|e| LoreError::MigrationFailed { version, message: format!("Failed to record schema version: {e}"), source: Some(e), })?; conn.execute_batch(&format!("RELEASE {}", savepoint_name)) .map_err(|e| LoreError::MigrationFailed { version, message: format!("Failed to release savepoint: {}", e), source: Some(e), })?; } Err(e) => { let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name)); let _ = conn.execute_batch(&format!("RELEASE {}", savepoint_name)); return Err(LoreError::MigrationFailed { version, message: e.to_string(), source: Some(e), }); } } info!(version, "Migration applied"); } Ok(()) } #[allow(dead_code)] pub fn run_migrations_from_dir(conn: &Connection, migrations_dir: &Path) -> Result<()> { let has_version_table: bool = conn .query_row( "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'", [], |row| row.get(0), ) .unwrap_or(false); let current_version: i32 = if has_version_table { conn.query_row( "SELECT COALESCE(MAX(version), 0) FROM schema_version", [], |row| row.get(0), ) .unwrap_or(0) } else { 0 }; let mut migrations: Vec<_> = fs::read_dir(migrations_dir)? .filter_map(|entry| entry.ok()) .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "sql")) .collect(); migrations.sort_by_key(|entry| entry.file_name()); for entry in migrations { let filename = entry.file_name(); let filename_str = filename.to_string_lossy(); let version: i32 = match filename_str.split('_').next().and_then(|v| v.parse().ok()) { Some(v) => v, None => continue, }; if version <= current_version { continue; } let sql = fs::read_to_string(entry.path())?; let savepoint_name = format!("migration_{}", version); conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name)) .map_err(|e| LoreError::MigrationFailed { version, message: format!("Failed to create savepoint: {}", e), source: Some(e), })?; match conn.execute_batch(&sql) { Ok(()) => { conn.execute_batch(&format!("RELEASE {}", savepoint_name)) .map_err(|e| LoreError::MigrationFailed { version, message: format!("Failed to release savepoint: {}", e), source: Some(e), })?; } Err(e) => { let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name)); let _ = conn.execute_batch(&format!("RELEASE {}", savepoint_name)); return Err(LoreError::MigrationFailed { version, message: e.to_string(), source: Some(e), }); } } info!(version, file = %filename_str, "Migration applied"); } Ok(()) } pub fn verify_pragmas(conn: &Connection) -> (bool, Vec) { let mut issues = Vec::new(); let journal_mode: String = conn .pragma_query_value(None, "journal_mode", |row| row.get(0)) .unwrap_or_default(); if journal_mode != "wal" { issues.push(format!("journal_mode is {journal_mode}, expected 'wal'")); } let foreign_keys: i32 = conn .pragma_query_value(None, "foreign_keys", |row| row.get(0)) .unwrap_or(0); if foreign_keys != 1 { issues.push(format!("foreign_keys is {foreign_keys}, expected 1")); } let busy_timeout: i32 = conn .pragma_query_value(None, "busy_timeout", |row| row.get(0)) .unwrap_or(0); if busy_timeout != 5000 { issues.push(format!("busy_timeout is {busy_timeout}, expected 5000")); } let synchronous: i32 = conn .pragma_query_value(None, "synchronous", |row| row.get(0)) .unwrap_or(0); if synchronous != 1 { issues.push(format!("synchronous is {synchronous}, expected 1 (NORMAL)")); } (issues.is_empty(), issues) } pub fn get_schema_version(conn: &Connection) -> i32 { let has_version_table: bool = conn .query_row( "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'", [], |row| row.get(0), ) .unwrap_or(false); if !has_version_table { return 0; } conn.query_row( "SELECT COALESCE(MAX(version), 0) FROM schema_version", [], |row| row.get(0), ) .unwrap_or(0) }