Files
gitlore/src/core/db.rs
Taylor Eernisse 6b75697638 feat(ingestion): enrich issues with work item status from GraphQL API
Add a "Phase 1.5" status enrichment step to the issue ingestion pipeline
that fetches work item statuses via the GitLab GraphQL API after the
standard REST API ingestion completes.

Schema changes (migration 021):
- Add status_name, status_category, status_color, status_icon_name, and
  status_synced_at columns to the issues table (all nullable)

Ingestion pipeline changes:
- New `enrich_issue_statuses_txn()` function that applies fetched
  statuses in a single transaction with two phases: clear stale statuses
  for issues that no longer have a status widget, then apply new/updated
  statuses from the GraphQL response
- ProgressEvent variants for status enrichment (complete/skipped)
- IngestProjectResult tracks enrichment metrics (seen, enriched, cleared,
  without_widget, partial_error_count, enrichment_mode, errors)
- Robot mode JSON output includes per-project status enrichment details

Configuration:
- New `sync.fetchWorkItemStatus` config option (defaults true) to disable
  GraphQL status enrichment on instances without Premium/Ultimate
- `LoreError::GitLabAuthFailed` now treated as permanent API error so
  status enrichment auth failures don't trigger retries

Also removes the unnecessary nested SAVEPOINT in store_closes_issues_refs
(already runs within the orchestrator's transaction context).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-11 08:09:21 -05:00

301 lines
9.4 KiB
Rust

use rusqlite::Connection;
use sqlite_vec::sqlite3_vec_init;
use std::fs;
use std::path::Path;
use tracing::{debug, info};
use super::error::{LoreError, Result};
pub const LATEST_SCHEMA_VERSION: i32 = MIGRATIONS.len() as i32;
const MIGRATIONS: &[(&str, &str)] = &[
("001", include_str!("../../migrations/001_initial.sql")),
("002", include_str!("../../migrations/002_issues.sql")),
("003", include_str!("../../migrations/003_indexes.sql")),
(
"004",
include_str!("../../migrations/004_discussions_payload.sql"),
),
(
"005",
include_str!("../../migrations/005_assignees_milestone_duedate.sql"),
),
(
"006",
include_str!("../../migrations/006_merge_requests.sql"),
),
("007", include_str!("../../migrations/007_documents.sql")),
("008", include_str!("../../migrations/008_fts5.sql")),
("009", include_str!("../../migrations/009_embeddings.sql")),
("010", include_str!("../../migrations/010_chunk_config.sql")),
(
"011",
include_str!("../../migrations/011_resource_events.sql"),
),
(
"012",
include_str!("../../migrations/012_nullable_label_milestone.sql"),
),
(
"013",
include_str!("../../migrations/013_resource_event_watermarks.sql"),
),
(
"014",
include_str!("../../migrations/014_sync_runs_enrichment.sql"),
),
(
"015",
include_str!("../../migrations/015_commit_shas_and_closes_watermark.sql"),
),
(
"016",
include_str!("../../migrations/016_mr_file_changes.sql"),
),
("017", include_str!("../../migrations/017_who_indexes.sql")),
(
"018",
include_str!("../../migrations/018_fix_assignees_composite_index.sql"),
),
(
"019",
include_str!("../../migrations/019_list_performance.sql"),
),
(
"020",
include_str!("../../migrations/020_mr_diffs_watermark.sql"),
),
(
"021",
include_str!("../../migrations/021_work_item_status.sql"),
),
];
pub fn create_connection(db_path: &Path) -> Result<Connection> {
// SAFETY: `sqlite3_vec_init` is an extern "C" function provided by the sqlite-vec
// crate with the exact signature expected by `sqlite3_auto_extension`. The transmute
// converts the concrete function pointer to the `Option<unsafe extern "C" fn()>` type
// that the FFI expects. This is safe because:
// 1. The function is a C-ABI init callback with a stable signature.
// 2. SQLite calls it once per new connection, matching sqlite-vec's contract.
// 3. `sqlite3_auto_extension` is idempotent for the same function pointer.
#[allow(clippy::missing_transmute_annotations)]
unsafe {
rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute(
sqlite3_vec_init as *const (),
)));
}
if let Some(parent) = db_path.parent() {
fs::create_dir_all(parent)?;
}
let conn = Connection::open(db_path)?;
conn.pragma_update(None, "journal_mode", "WAL")?;
conn.pragma_update(None, "synchronous", "NORMAL")?;
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.pragma_update(None, "busy_timeout", 5000)?;
conn.pragma_update(None, "temp_store", "MEMORY")?;
conn.pragma_update(None, "cache_size", -64000)?;
conn.pragma_update(None, "mmap_size", 268_435_456)?;
debug!(db_path = %db_path.display(), "Database connection created");
Ok(conn)
}
pub fn run_migrations(conn: &Connection) -> Result<()> {
let has_version_table: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get(0),
)
.unwrap_or(false);
let current_version: i32 = if has_version_table {
conn.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.unwrap_or(0)
} else {
0
};
for (version_str, sql) in MIGRATIONS {
let version: i32 = version_str.parse().expect("Invalid migration version");
if version <= current_version {
debug!(version, "Migration already applied");
continue;
}
let savepoint_name = format!("migration_{}", version);
conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to create savepoint: {}", e),
source: Some(e),
})?;
match conn.execute_batch(sql) {
Ok(()) => {
conn.execute_batch(&format!("RELEASE {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to release savepoint: {}", e),
source: Some(e),
})?;
}
Err(e) => {
let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name));
let _ = conn.execute_batch(&format!("RELEASE {}", savepoint_name));
return Err(LoreError::MigrationFailed {
version,
message: e.to_string(),
source: Some(e),
});
}
}
info!(version, "Migration applied");
}
Ok(())
}
#[allow(dead_code)]
pub fn run_migrations_from_dir(conn: &Connection, migrations_dir: &Path) -> Result<()> {
let has_version_table: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get(0),
)
.unwrap_or(false);
let current_version: i32 = if has_version_table {
conn.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.unwrap_or(0)
} else {
0
};
let mut migrations: Vec<_> = fs::read_dir(migrations_dir)?
.filter_map(|entry| entry.ok())
.filter(|entry| entry.path().extension().is_some_and(|ext| ext == "sql"))
.collect();
migrations.sort_by_key(|entry| entry.file_name());
for entry in migrations {
let filename = entry.file_name();
let filename_str = filename.to_string_lossy();
let version: i32 = match filename_str.split('_').next().and_then(|v| v.parse().ok()) {
Some(v) => v,
None => continue,
};
if version <= current_version {
continue;
}
let sql = fs::read_to_string(entry.path())?;
let savepoint_name = format!("migration_{}", version);
conn.execute_batch(&format!("SAVEPOINT {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to create savepoint: {}", e),
source: Some(e),
})?;
match conn.execute_batch(&sql) {
Ok(()) => {
conn.execute_batch(&format!("RELEASE {}", savepoint_name))
.map_err(|e| LoreError::MigrationFailed {
version,
message: format!("Failed to release savepoint: {}", e),
source: Some(e),
})?;
}
Err(e) => {
let _ = conn.execute_batch(&format!("ROLLBACK TO {}", savepoint_name));
let _ = conn.execute_batch(&format!("RELEASE {}", savepoint_name));
return Err(LoreError::MigrationFailed {
version,
message: e.to_string(),
source: Some(e),
});
}
}
info!(version, file = %filename_str, "Migration applied");
}
Ok(())
}
pub fn verify_pragmas(conn: &Connection) -> (bool, Vec<String>) {
let mut issues = Vec::new();
let journal_mode: String = conn
.pragma_query_value(None, "journal_mode", |row| row.get(0))
.unwrap_or_default();
if journal_mode != "wal" {
issues.push(format!("journal_mode is {journal_mode}, expected 'wal'"));
}
let foreign_keys: i32 = conn
.pragma_query_value(None, "foreign_keys", |row| row.get(0))
.unwrap_or(0);
if foreign_keys != 1 {
issues.push(format!("foreign_keys is {foreign_keys}, expected 1"));
}
let busy_timeout: i32 = conn
.pragma_query_value(None, "busy_timeout", |row| row.get(0))
.unwrap_or(0);
if busy_timeout != 5000 {
issues.push(format!("busy_timeout is {busy_timeout}, expected 5000"));
}
let synchronous: i32 = conn
.pragma_query_value(None, "synchronous", |row| row.get(0))
.unwrap_or(0);
if synchronous != 1 {
issues.push(format!("synchronous is {synchronous}, expected 1 (NORMAL)"));
}
(issues.is_empty(), issues)
}
pub fn get_schema_version(conn: &Connection) -> i32 {
let has_version_table: bool = conn
.query_row(
"SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'",
[],
|row| row.get(0),
)
.unwrap_or(false);
if !has_version_table {
return 0;
}
conn.query_row(
"SELECT COALESCE(MAX(version), 0) FROM schema_version",
[],
|row| row.get(0),
)
.unwrap_or(0)
}