Files
gitlore/src/core/db.rs
Taylor Eernisse 121a634653 fix: critical data integrity — timeline dedup, discussion atomicity, index collision
Three correctness bugs found via peer code review:

1. TimelineEvent PartialEq/Ord omitted entity_type — issue #42 and MR #42
   with the same timestamp and event_type were treated as equal. In a
   BTreeSet or dedup, one would silently be dropped. Added entity_type to
   both PartialEq and Ord comparisons.

2. discussions.rs: store_payload() was called outside the transaction
   (on bare conn) while upsert_discussion/notes were inside. A crash
   between them left orphaned payload rows. Moved store_payload inside
   the unchecked_transaction block, matching mr_discussions.rs pattern.

3. Migration 017 created idx_issue_assignees_username(username, issue_id)
   but migration 005 already created the same index name with just
   (username). SQLite's IF NOT EXISTS silently skipped the composite
   version on every existing database. New migration 018 drops and
   recreates the index with correct composite columns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 07:54:59 -05:00

287 lines
9.0 KiB
Rust

use rusqlite::Connection;
use sqlite_vec::sqlite3_vec_init;
use std::fs;
use std::path::Path;
use tracing::{debug, info};
use super::error::{LoreError, Result};
pub const LATEST_SCHEMA_VERSION: i32 = MIGRATIONS.len() as i32;
const MIGRATIONS: &[(&str, &str)] = &[
("001", include_str!("../../migrations/001_initial.sql")),
("002", include_str!("../../migrations/002_issues.sql")),
("003", include_str!("../../migrations/003_indexes.sql")),
(
"004",
include_str!("../../migrations/004_discussions_payload.sql"),
),
(
"005",
include_str!("../../migrations/005_assignees_milestone_duedate.sql"),
),
(
"006",
include_str!("../../migrations/006_merge_requests.sql"),
),
("007", include_str!("../../migrations/007_documents.sql")),
("008", include_str!("../../migrations/008_fts5.sql")),
("009", include_str!("../../migrations/009_embeddings.sql")),
("010", include_str!("../../migrations/010_chunk_config.sql")),
(
"011",
include_str!("../../migrations/011_resource_events.sql"),
),
(
"012",
include_str!("../../migrations/012_nullable_label_milestone.sql"),
),
(
"013",
include_str!("../../migrations/013_resource_event_watermarks.sql"),
),
(
"014",
include_str!("../../migrations/014_sync_runs_enrichment.sql"),
),
(
"015",
include_str!("../../migrations/015_commit_shas_and_closes_watermark.sql"),
),
(
"016",
include_str!("../../migrations/016_mr_file_changes.sql"),
),
("017", include_str!("../../migrations/017_who_indexes.sql")),
(
"018",
include_str!("../../migrations/018_fix_assignees_composite_index.sql"),
),
];
pub fn create_connection(db_path: &Path) -> Result<Connection> {
// SAFETY: `sqlite3_vec_init` is an extern "C" function provided by the sqlite-vec
// crate with the exact signature expected by `sqlite3_auto_extension`. The transmute
// converts the concrete function pointer to the `Option<unsafe extern "C" fn()>` type
// that the FFI expects. This is safe because:
// 1. The function is a C-ABI init callback with a stable signature.
// 2. SQLite calls it once per new connection, matching sqlite-vec's contract.
// 3. `sqlite3_auto_extension` is idempotent for the same function pointer.
#[allow(clippy::missing_transmute_annotations)]
unsafe {
rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
sqlite3_vec_init as *const (),
)));
}
if let Some(parent) = db_path.parent() {
fs::create_dir_all(parent)?;
}
let conn = Connection::open(db_path)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.pragma_update(None, "busy_timeout", 5000)?;
conn.pragma_update(None, "temp_store", "MEMORY")?;
conn.pragma_update(None, "cache_size", -64000)?;
conn.pragma_update(None, "mmap_size", 268_435_456)?;
debug!(db_path = %db_path.display(), "Database connection created");
Ok(conn)
}
pub fn run_migrations(conn: &Connection) -> Result<()> {
let has_version_table: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get(0),
)
.unwrap_or(false);
let current_version: i32 = if has_version_table {
conn.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.unwrap_or(0)
} else {
0
};
for (version_str, sql) in MIGRATIONS {
let version: i32 = version_str.parse().expect("Invalid migration version");
if version <= current_version {
debug!(version, "Migration already applied");
continue;
}
let savepoint_name = format!("migration_{}", version);
conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to create savepoint: {}", e),
source: Some(e),
})?;
match conn.execute_batch(sql) {
Ok(()) => {
conn.execute_batch(&format!("RELEASE {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to release savepoint: {}", e),
source: Some(e),
})?;
}
Err(e) => {
let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name));
return Err(LoreError::MigrationFailed {
version,
message: e.to_string(),
source: Some(e),
});
}
}
info!(version, "Migration applied");
}
Ok(())
}
#[allow(dead_code)]
pub fn run_migrations_from_dir(conn: &Connection, migrations_dir: &Path) -> Result<()> {
let has_version_table: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get(0),
)
.unwrap_or(false);
let current_version: i32 = if has_version_table {
conn.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.unwrap_or(0)
} else {
0
};
let mut migrations: Vec<_> = fs::read_dir(migrations_dir)?
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "sql"))
.collect();
migrations.sort_by_key(|entry| entry.file_name());
for entry in migrations {
let filename = entry.file_name();
let filename_str = filename.to_string_lossy();
let version: i32 = match filename_str.split('_').next().and_then(|v| v.parse().ok()) {
Some(v) => v,
None => continue,
};
if version <= current_version {
continue;
}
let sql = fs::read_to_string(entry.path())?;
let savepoint_name = format!("migration_{}", version);
conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to create savepoint: {}", e),
source: Some(e),
})?;
match conn.execute_batch(&sql) {
Ok(()) => {
conn.execute_batch(&format!("RELEASE {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to release savepoint: {}", e),
source: Some(e),
})?;
}
Err(e) => {
let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name));
return Err(LoreError::MigrationFailed {
version,
message: e.to_string(),
source: Some(e),
});
}
}
info!(version, file = %filename_str, "Migration applied");
}
Ok(())
}
pub fn verify_pragmas(conn: &Connection) -> (bool, Vec<String>) {
let mut issues = Vec::new();
let journal_mode: String = conn
.pragma_query_value(None, "journal_mode", |row| row.get(0))
.unwrap_or_default();
if journal_mode != "wal" {
issues.push(format!("journal_mode is {journal_mode}, expected 'wal'"));
}
let foreign_keys: i32 = conn
.pragma_query_value(None, "foreign_keys", |row| row.get(0))
.unwrap_or(0);
if foreign_keys != 1 {
issues.push(format!("foreign_keys is {foreign_keys}, expected 1"));
}
let busy_timeout: i32 = conn
.pragma_query_value(None, "busy_timeout", |row| row.get(0))
.unwrap_or(0);
if busy_timeout != 5000 {
issues.push(format!("busy_timeout is {busy_timeout}, expected 5000"));
}
let synchronous: i32 = conn
.pragma_query_value(None, "synchronous", |row| row.get(0))
.unwrap_or(0);
if synchronous != 1 {
issues.push(format!("synchronous is {synchronous}, expected 1 (NORMAL)"));
}
(issues.is_empty(), issues)
}
pub fn get_schema_version(conn: &Connection) -> i32 {
let has_version_table: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if !has_version_table {
return 0;
}
conn.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.unwrap_or(0)
}