Add foreign key constraint on discussions.merge_request_id to prevent orphaned discussions when MRs are deleted. SQLite doesn't support ALTER TABLE ADD CONSTRAINT, so this migration recreates the table with: 1. New table with FK: REFERENCES merge_requests(id) ON DELETE CASCADE 2. Data copy with FK validation (only copies rows with valid MR references) 3. Table swap (DROP old, RENAME new) 4. Full index recreation (all 10 indexes from migrations 002-022) The migration also includes a CHECK constraint ensuring mutual exclusivity: - Issue discussions have issue_id NOT NULL and merge_request_id NULL - MR discussions have merge_request_id NOT NULL and issue_id NULL Also fixes run_migrations() to properly propagate query errors instead of silently returning unwrap_or defaults, improving error diagnostics. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
346 lines
11 KiB
Rust
346 lines
11 KiB
Rust
use rusqlite::Connection;
|
|
use sqlite_vec::sqlite3_vec_init;
|
|
use std::fs;
|
|
use std::path::Path;
|
|
use tracing::{debug, info};
|
|
|
|
use super::error::{LoreError, Result};
|
|
|
|
pub const LATEST_SCHEMA_VERSION: i32 = MIGRATIONS.len() as i32;
|
|
|
|
const MIGRATIONS: &[(&str, &str)] = &[
|
|
("001", include_str!("../../migrations/001_initial.sql")),
|
|
("002", include_str!("../../migrations/002_issues.sql")),
|
|
("003", include_str!("../../migrations/003_indexes.sql")),
|
|
(
|
|
"004",
|
|
include_str!("../../migrations/004_discussions_payload.sql"),
|
|
),
|
|
(
|
|
"005",
|
|
include_str!("../../migrations/005_assignees_milestone_duedate.sql"),
|
|
),
|
|
(
|
|
"006",
|
|
include_str!("../../migrations/006_merge_requests.sql"),
|
|
),
|
|
("007", include_str!("../../migrations/007_documents.sql")),
|
|
("008", include_str!("../../migrations/008_fts5.sql")),
|
|
("009", include_str!("../../migrations/009_embeddings.sql")),
|
|
("010", include_str!("../../migrations/010_chunk_config.sql")),
|
|
(
|
|
"011",
|
|
include_str!("../../migrations/011_resource_events.sql"),
|
|
),
|
|
(
|
|
"012",
|
|
include_str!("../../migrations/012_nullable_label_milestone.sql"),
|
|
),
|
|
(
|
|
"013",
|
|
include_str!("../../migrations/013_resource_event_watermarks.sql"),
|
|
),
|
|
(
|
|
"014",
|
|
include_str!("../../migrations/014_sync_runs_enrichment.sql"),
|
|
),
|
|
(
|
|
"015",
|
|
include_str!("../../migrations/015_commit_shas_and_closes_watermark.sql"),
|
|
),
|
|
(
|
|
"016",
|
|
include_str!("../../migrations/016_mr_file_changes.sql"),
|
|
),
|
|
("017", include_str!("../../migrations/017_who_indexes.sql")),
|
|
(
|
|
"018",
|
|
include_str!("../../migrations/018_fix_assignees_composite_index.sql"),
|
|
),
|
|
(
|
|
"019",
|
|
include_str!("../../migrations/019_list_performance.sql"),
|
|
),
|
|
(
|
|
"020",
|
|
include_str!("../../migrations/020_mr_diffs_watermark.sql"),
|
|
),
|
|
(
|
|
"021",
|
|
include_str!("../../migrations/021_work_item_status.sql"),
|
|
),
|
|
(
|
|
"022",
|
|
include_str!("../../migrations/022_notes_query_index.sql"),
|
|
),
|
|
(
|
|
"023",
|
|
include_str!("../../migrations/023_issue_detail_fields.sql"),
|
|
),
|
|
(
|
|
"024",
|
|
include_str!("../../migrations/024_note_documents.sql"),
|
|
),
|
|
(
|
|
"025",
|
|
include_str!("../../migrations/025_note_dirty_backfill.sql"),
|
|
),
|
|
(
|
|
"026",
|
|
include_str!("../../migrations/026_scoring_indexes.sql"),
|
|
),
|
|
(
|
|
"027",
|
|
include_str!("../../migrations/027_surgical_sync_runs.sql"),
|
|
),
|
|
(
|
|
"028",
|
|
include_str!("../../migrations/028_discussions_mr_fk.sql"),
|
|
),
|
|
];
|
|
|
|
pub fn create_connection(db_path: &Path) -> Result<Connection> {
|
|
// SAFETY: `sqlite3_vec_init` is an extern "C" function provided by the sqlite-vec
|
|
// crate with the exact signature expected by `sqlite3_auto_extension`. The transmute
|
|
// converts the concrete function pointer to the `Option<unsafe extern "C" fn()>` type
|
|
// that the FFI expects. This is safe because:
|
|
// 1. The function is a C-ABI init callback with a stable signature.
|
|
// 2. SQLite calls it once per new connection, matching sqlite-vec's contract.
|
|
// 3. `sqlite3_auto_extension` is idempotent for the same function pointer.
|
|
#[allow(clippy::missing_transmute_annotations)]
|
|
unsafe {
|
|
rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
|
|
sqlite3_vec_init as *const (),
|
|
)));
|
|
}
|
|
|
|
if let Some(parent) = db_path.parent() {
|
|
fs::create_dir_all(parent)?;
|
|
}
|
|
|
|
let conn = Connection::open(db_path)?;
|
|
|
|
conn.pragma_update(None, "journal_mode", "WAL")?;
|
|
conn.pragma_update(None, "synchronous", "NORMAL")?;
|
|
conn.pragma_update(None, "foreign_keys", "ON")?;
|
|
conn.pragma_update(None, "busy_timeout", 5000)?;
|
|
conn.pragma_update(None, "temp_store", "MEMORY")?;
|
|
conn.pragma_update(None, "cache_size", -64000)?;
|
|
conn.pragma_update(None, "mmap_size", 268_435_456)?;
|
|
|
|
debug!(db_path = %db_path.display(), "Database connection created");
|
|
|
|
Ok(conn)
|
|
}
|
|
|
|
pub fn run_migrations(conn: &Connection) -> Result<()> {
|
|
// Note: sqlite_master always exists, so errors here indicate real DB problems
|
|
// (corruption, locked, etc.) - we must not silently treat them as "fresh DB"
|
|
let has_version_table: bool = conn.query_row(
|
|
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
|
|
[],
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
let current_version: i32 = if has_version_table {
|
|
conn.query_row(
|
|
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
|
|
[],
|
|
|row| row.get(0),
|
|
)?
|
|
} else {
|
|
0
|
|
};
|
|
|
|
for (version_str, sql) in MIGRATIONS {
|
|
let version: i32 = version_str.parse().expect("Invalid migration version");
|
|
|
|
if version <= current_version {
|
|
debug!(version, "Migration already applied");
|
|
continue;
|
|
}
|
|
|
|
let savepoint_name = format!("migration_{}", version);
|
|
conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name))
|
|
.map_err(|e| LoreError::MigrationFailed {
|
|
version,
|
|
message: format!("Failed to create savepoint: {}", e),
|
|
source: Some(e),
|
|
})?;
|
|
|
|
match conn.execute_batch(sql) {
|
|
Ok(()) => {
|
|
// Framework-managed version bookkeeping: ensures the version is
|
|
// always recorded even if a migration .sql omits the INSERT.
|
|
// Uses OR REPLACE so legacy migrations that self-register are harmless.
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO schema_version (version, applied_at, description) \
|
|
VALUES (?1, strftime('%s', 'now') * 1000, ?2)",
|
|
rusqlite::params![version, version_str],
|
|
)
|
|
.map_err(|e| LoreError::MigrationFailed {
|
|
version,
|
|
message: format!("Failed to record schema version: {e}"),
|
|
source: Some(e),
|
|
})?;
|
|
|
|
conn.execute_batch(&format!("RELEASE {}", savepoint_name))
|
|
.map_err(|e| LoreError::MigrationFailed {
|
|
version,
|
|
message: format!("Failed to release savepoint: {}", e),
|
|
source: Some(e),
|
|
})?;
|
|
}
|
|
Err(e) => {
|
|
let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name));
|
|
let _ = conn.execute_batch(&format!("RELEASE {}", savepoint_name));
|
|
return Err(LoreError::MigrationFailed {
|
|
version,
|
|
message: e.to_string(),
|
|
source: Some(e),
|
|
});
|
|
}
|
|
}
|
|
|
|
info!(version, "Migration applied");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[allow(dead_code)]
|
|
pub fn run_migrations_from_dir(conn: &Connection, migrations_dir: &Path) -> Result<()> {
|
|
let has_version_table: bool = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap_or(false);
|
|
|
|
let current_version: i32 = if has_version_table {
|
|
conn.query_row(
|
|
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap_or(0)
|
|
} else {
|
|
0
|
|
};
|
|
|
|
let mut migrations: Vec<_> = fs::read_dir(migrations_dir)?
|
|
.filter_map(|entry| entry.ok())
|
|
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "sql"))
|
|
.collect();
|
|
|
|
migrations.sort_by_key(|entry| entry.file_name());
|
|
|
|
for entry in migrations {
|
|
let filename = entry.file_name();
|
|
let filename_str = filename.to_string_lossy();
|
|
|
|
let version: i32 = match filename_str.split('_').next().and_then(|v| v.parse().ok()) {
|
|
Some(v) => v,
|
|
None => continue,
|
|
};
|
|
|
|
if version <= current_version {
|
|
continue;
|
|
}
|
|
|
|
let sql = fs::read_to_string(entry.path())?;
|
|
|
|
let savepoint_name = format!("migration_{}", version);
|
|
conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name))
|
|
.map_err(|e| LoreError::MigrationFailed {
|
|
version,
|
|
message: format!("Failed to create savepoint: {}", e),
|
|
source: Some(e),
|
|
})?;
|
|
|
|
match conn.execute_batch(&sql) {
|
|
Ok(()) => {
|
|
conn.execute_batch(&format!("RELEASE {}", savepoint_name))
|
|
.map_err(|e| LoreError::MigrationFailed {
|
|
version,
|
|
message: format!("Failed to release savepoint: {}", e),
|
|
source: Some(e),
|
|
})?;
|
|
}
|
|
Err(e) => {
|
|
let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name));
|
|
let _ = conn.execute_batch(&format!("RELEASE {}", savepoint_name));
|
|
return Err(LoreError::MigrationFailed {
|
|
version,
|
|
message: e.to_string(),
|
|
source: Some(e),
|
|
});
|
|
}
|
|
}
|
|
|
|
info!(version, file = %filename_str, "Migration applied");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub fn verify_pragmas(conn: &Connection) -> (bool, Vec<String>) {
|
|
let mut issues = Vec::new();
|
|
|
|
let journal_mode: String = conn
|
|
.pragma_query_value(None, "journal_mode", |row| row.get(0))
|
|
.unwrap_or_default();
|
|
if journal_mode != "wal" {
|
|
issues.push(format!("journal_mode is {journal_mode}, expected 'wal'"));
|
|
}
|
|
|
|
let foreign_keys: i32 = conn
|
|
.pragma_query_value(None, "foreign_keys", |row| row.get(0))
|
|
.unwrap_or(0);
|
|
if foreign_keys != 1 {
|
|
issues.push(format!("foreign_keys is {foreign_keys}, expected 1"));
|
|
}
|
|
|
|
let busy_timeout: i32 = conn
|
|
.pragma_query_value(None, "busy_timeout", |row| row.get(0))
|
|
.unwrap_or(0);
|
|
if busy_timeout != 5000 {
|
|
issues.push(format!("busy_timeout is {busy_timeout}, expected 5000"));
|
|
}
|
|
|
|
let synchronous: i32 = conn
|
|
.pragma_query_value(None, "synchronous", |row| row.get(0))
|
|
.unwrap_or(0);
|
|
if synchronous != 1 {
|
|
issues.push(format!("synchronous is {synchronous}, expected 1 (NORMAL)"));
|
|
}
|
|
|
|
(issues.is_empty(), issues)
|
|
}
|
|
|
|
pub fn get_schema_version(conn: &Connection) -> i32 {
|
|
let has_version_table: bool = conn
|
|
.query_row(
|
|
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap_or(false);
|
|
|
|
if !has_version_table {
|
|
return 0;
|
|
}
|
|
|
|
conn.query_row(
|
|
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
|
|
[],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap_or(0)
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[path = "db_tests.rs"]
|
|
mod tests;
|