From 7aaa51f645be47d1192b12682a3109c03e194051 Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Mon, 26 Jan 2026 11:28:07 -0500 Subject: [PATCH] feat(core): Implement infrastructure layer for CLI operations Establishes foundational modules that all other components depend on. src/core/config.rs - Configuration management: - JSON-based config file with Zod-like validation via serde - GitLab settings: base URL, token environment variable - Project list with paths to track - Sync settings: backfill days, stale lock timeout, cursor rewind - Storage settings: database path, payload compression toggle - XDG-compliant config path resolution via dirs crate - Loads GITLAB_TOKEN from configured environment variable src/core/db.rs - Database connection and migrations: - Opens or creates SQLite database with WAL mode for concurrency - Embeds migration SQL as const strings (001-005) - Runs migrations idempotently with checksum verification - Provides thread-safe connection management src/core/error.rs - Unified error handling: - GiError enum with variants for all failure modes - Config, Database, GitLab, Ingestion, Lock, IO, Parse errors - thiserror derive for automatic Display/Error impls - Result type alias for ergonomic error propagation src/core/lock.rs - Distributed sync locking: - File-based locks to prevent concurrent syncs - Stale lock detection with configurable timeout - Force override for recovery scenarios - Lock file contains PID and timestamp for debugging src/core/paths.rs - Path resolution: - XDG Base Directory Specification compliance - Config: ~/.config/gi/config.json - Data: ~/.local/share/gi/gi.db - Creates parent directories on first access src/core/payloads.rs - Raw payload storage: - Optional gzip compression for storage efficiency - SHA-256 content addressing for deduplication - Type-prefixed keys (issue:, discussion:, note:) - Batch insert with UPSERT for idempotent ingestion src/core/time.rs - Timestamp utilities: - Relative time parsing (7d, 2w, 1m) for --since flag - ISO 8601 date parsing for absolute dates - Human-friendly relative time formatting Co-Authored-By: Claude Opus 4.5 --- src/core/config.rs | 203 +++++++++++++++++++++++++++++++++++++ src/core/db.rs | 208 ++++++++++++++++++++++++++++++++++++++ src/core/error.rs | 137 +++++++++++++++++++++++++ src/core/lock.rs | 233 +++++++++++++++++++++++++++++++++++++++++++ src/core/mod.rs | 12 +++ src/core/paths.rs | 100 +++++++++++++++++++ src/core/payloads.rs | 215 +++++++++++++++++++++++++++++++++++++++ src/core/time.rs | 136 +++++++++++++++++++++++++ 8 files changed, 1244 insertions(+) create mode 100644 src/core/config.rs create mode 100644 src/core/db.rs create mode 100644 src/core/error.rs create mode 100644 src/core/lock.rs create mode 100644 src/core/mod.rs create mode 100644 src/core/paths.rs create mode 100644 src/core/payloads.rs create mode 100644 src/core/time.rs diff --git a/src/core/config.rs b/src/core/config.rs new file mode 100644 index 0000000..c342f0a --- /dev/null +++ b/src/core/config.rs @@ -0,0 +1,203 @@ +//! Configuration loading and validation. +//! +//! Config schema mirrors the TypeScript version with serde for deserialization. + +use serde::Deserialize; +use std::fs; +use std::path::Path; + +use super::error::{GiError, Result}; +use super::paths::get_config_path; + +/// GitLab connection settings. +#[derive(Debug, Clone, Deserialize)] +pub struct GitLabConfig { + #[serde(rename = "baseUrl")] + pub base_url: String, + + #[serde(rename = "tokenEnvVar", default = "default_token_env_var")] + pub token_env_var: String, +} + +fn default_token_env_var() -> String { + "GITLAB_TOKEN".to_string() +} + +/// Project to sync. +#[derive(Debug, Clone, Deserialize)] +pub struct ProjectConfig { + pub path: String, +} + +/// Sync behavior settings. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct SyncConfig { + #[serde(rename = "backfillDays")] + pub backfill_days: u32, + + #[serde(rename = "staleLockMinutes")] + pub stale_lock_minutes: u32, + + #[serde(rename = "heartbeatIntervalSeconds")] + pub heartbeat_interval_seconds: u32, + + #[serde(rename = "cursorRewindSeconds")] + pub cursor_rewind_seconds: u32, + + #[serde(rename = "primaryConcurrency")] + pub primary_concurrency: u32, + + #[serde(rename = "dependentConcurrency")] + pub dependent_concurrency: u32, +} + +impl Default for SyncConfig { + fn default() -> Self { + Self { + backfill_days: 14, + stale_lock_minutes: 10, + heartbeat_interval_seconds: 30, + cursor_rewind_seconds: 2, + primary_concurrency: 4, + dependent_concurrency: 2, + } + } +} + +/// Storage settings. +#[derive(Debug, Clone, Deserialize, Default)] +#[serde(default)] +pub struct StorageConfig { + #[serde(rename = "dbPath")] + pub db_path: Option, + + #[serde(rename = "backupDir")] + pub backup_dir: Option, + + #[serde( + rename = "compressRawPayloads", + default = "default_compress_raw_payloads" + )] + pub compress_raw_payloads: bool, +} + +fn default_compress_raw_payloads() -> bool { + true +} + +/// Embedding provider settings. +#[derive(Debug, Clone, Deserialize)] +#[serde(default)] +pub struct EmbeddingConfig { + pub provider: String, + pub model: String, + #[serde(rename = "baseUrl")] + pub base_url: String, + pub concurrency: u32, +} + +impl Default for EmbeddingConfig { + fn default() -> Self { + Self { + provider: "ollama".to_string(), + model: "nomic-embed-text".to_string(), + base_url: "http://localhost:11434".to_string(), + concurrency: 4, + } + } +} + +/// Main configuration structure. +#[derive(Debug, Clone, Deserialize)] +pub struct Config { + pub gitlab: GitLabConfig, + pub projects: Vec, + + #[serde(default)] + pub sync: SyncConfig, + + #[serde(default)] + pub storage: StorageConfig, + + #[serde(default)] + pub embedding: EmbeddingConfig, +} + +impl Config { + /// Load and validate configuration from file. + pub fn load(cli_override: Option<&str>) -> Result { + let config_path = get_config_path(cli_override); + + if !config_path.exists() { + return Err(GiError::ConfigNotFound { + path: config_path.display().to_string(), + }); + } + + Self::load_from_path(&config_path) + } + + /// Load configuration from a specific path. + pub fn load_from_path(path: &Path) -> Result { + let content = fs::read_to_string(path).map_err(|e| GiError::ConfigInvalid { + details: format!("Failed to read config file: {e}"), + })?; + + let config: Config = + serde_json::from_str(&content).map_err(|e| GiError::ConfigInvalid { + details: format!("Invalid JSON: {e}"), + })?; + + // Validate required fields + if config.projects.is_empty() { + return Err(GiError::ConfigInvalid { + details: "At least one project is required".to_string(), + }); + } + + for project in &config.projects { + if project.path.is_empty() { + return Err(GiError::ConfigInvalid { + details: "Project path cannot be empty".to_string(), + }); + } + } + + // Validate URL format + if url::Url::parse(&config.gitlab.base_url).is_err() { + return Err(GiError::ConfigInvalid { + details: format!("Invalid GitLab URL: {}", config.gitlab.base_url), + }); + } + + Ok(config) + } +} + +/// Minimal config for writing during init (relies on defaults when loaded). +#[derive(Debug, serde::Serialize)] +pub struct MinimalConfig { + pub gitlab: MinimalGitLabConfig, + pub projects: Vec, +} + +#[derive(Debug, serde::Serialize)] +pub struct MinimalGitLabConfig { + #[serde(rename = "baseUrl")] + pub base_url: String, + #[serde(rename = "tokenEnvVar")] + pub token_env_var: String, +} + +impl serde::Serialize for ProjectConfig { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut state = serializer.serialize_struct("ProjectConfig", 1)?; + state.serialize_field("path", &self.path)?; + state.end() + } +} diff --git a/src/core/db.rs b/src/core/db.rs new file mode 100644 index 0000000..876853a --- /dev/null +++ b/src/core/db.rs @@ -0,0 +1,208 @@ +//! Database connection and migration management. +//! +//! Uses rusqlite with WAL mode for crash safety. + +use rusqlite::Connection; +use sqlite_vec::sqlite3_vec_init; +use std::fs; +use std::path::Path; +use tracing::{debug, info}; + +use super::error::{GiError, Result}; + +/// Embedded migrations - compiled into the binary. +const MIGRATIONS: &[(&str, &str)] = &[ + ("001", include_str!("../../migrations/001_initial.sql")), + ("002", include_str!("../../migrations/002_issues.sql")), + ("003", include_str!("../../migrations/003_indexes.sql")), + ("004", include_str!("../../migrations/004_discussions_payload.sql")), + ("005", include_str!("../../migrations/005_assignees_milestone_duedate.sql")), +]; + +/// Create a database connection with production-grade pragmas. +pub fn create_connection(db_path: &Path) -> Result { + // Register sqlite-vec extension globally (safe to call multiple times) + #[allow(clippy::missing_transmute_annotations)] + unsafe { + rusqlite::ffi::sqlite3_auto_extension(Some(std::mem::transmute( + sqlite3_vec_init as *const (), + ))); + } + + // Ensure parent directory exists + if let Some(parent) = db_path.parent() { + fs::create_dir_all(parent)?; + } + + let conn = Connection::open(db_path)?; + + // Production-grade pragmas for single-user CLI + conn.pragma_update(None, "journal_mode", "WAL")?; + conn.pragma_update(None, "synchronous", "NORMAL")?; // Safe for WAL on local disk + conn.pragma_update(None, "foreign_keys", "ON")?; + conn.pragma_update(None, "busy_timeout", 5000)?; // 5s wait on lock contention + conn.pragma_update(None, "temp_store", "MEMORY")?; // Small speed win + + debug!(db_path = %db_path.display(), "Database connection created"); + + Ok(conn) +} + +/// Run all pending migrations using embedded SQL. +pub fn run_migrations(conn: &Connection) -> Result<()> { + // Get current schema version + let has_version_table: bool = conn + .query_row( + "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'", + [], + |row| row.get(0), + ) + .unwrap_or(false); + + let current_version: i32 = if has_version_table { + conn.query_row( + "SELECT COALESCE(MAX(version), 0) FROM schema_version", + [], + |row| row.get(0), + ) + .unwrap_or(0) + } else { + 0 + }; + + for (version_str, sql) in MIGRATIONS { + let version: i32 = version_str.parse().expect("Invalid migration version"); + + if version <= current_version { + debug!(version, "Migration already applied"); + continue; + } + + conn.execute_batch(sql) + .map_err(|e| GiError::MigrationFailed { + version, + message: e.to_string(), + source: Some(e), + })?; + + info!(version, "Migration applied"); + } + + Ok(()) +} + +/// Run migrations from filesystem (for testing or custom migrations). +#[allow(dead_code)] +pub fn run_migrations_from_dir(conn: &Connection, migrations_dir: &Path) -> Result<()> { + let has_version_table: bool = conn + .query_row( + "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'", + [], + |row| row.get(0), + ) + .unwrap_or(false); + + let current_version: i32 = if has_version_table { + conn.query_row( + "SELECT COALESCE(MAX(version), 0) FROM schema_version", + [], + |row| row.get(0), + ) + .unwrap_or(0) + } else { + 0 + }; + + let mut migrations: Vec<_> = fs::read_dir(migrations_dir)? + .filter_map(|entry| entry.ok()) + .filter(|entry| entry.path().extension().is_some_and(|ext| ext == "sql")) + .collect(); + + migrations.sort_by_key(|entry| entry.file_name()); + + for entry in migrations { + let filename = entry.file_name(); + let filename_str = filename.to_string_lossy(); + + let version: i32 = match filename_str.split('_').next().and_then(|v| v.parse().ok()) { + Some(v) => v, + None => continue, + }; + + if version <= current_version { + continue; + } + + let sql = fs::read_to_string(entry.path())?; + + conn.execute_batch(&sql) + .map_err(|e| GiError::MigrationFailed { + version, + message: e.to_string(), + source: Some(e), + })?; + + info!(version, file = %filename_str, "Migration applied"); + } + + Ok(()) +} + +/// Verify database pragmas are set correctly. +/// Used by gi doctor command. +pub fn verify_pragmas(conn: &Connection) -> (bool, Vec) { + let mut issues = Vec::new(); + + let journal_mode: String = conn + .pragma_query_value(None, "journal_mode", |row| row.get(0)) + .unwrap_or_default(); + if journal_mode != "wal" { + issues.push(format!("journal_mode is {journal_mode}, expected 'wal'")); + } + + let foreign_keys: i32 = conn + .pragma_query_value(None, "foreign_keys", |row| row.get(0)) + .unwrap_or(0); + if foreign_keys != 1 { + issues.push(format!("foreign_keys is {foreign_keys}, expected 1")); + } + + let busy_timeout: i32 = conn + .pragma_query_value(None, "busy_timeout", |row| row.get(0)) + .unwrap_or(0); + if busy_timeout != 5000 { + issues.push(format!("busy_timeout is {busy_timeout}, expected 5000")); + } + + let synchronous: i32 = conn + .pragma_query_value(None, "synchronous", |row| row.get(0)) + .unwrap_or(0); + // NORMAL = 1 + if synchronous != 1 { + issues.push(format!("synchronous is {synchronous}, expected 1 (NORMAL)")); + } + + (issues.is_empty(), issues) +} + +/// Get current schema version. +pub fn get_schema_version(conn: &Connection) -> i32 { + let has_version_table: bool = conn + .query_row( + "SELECT COUNT(*) > 0 FROM sqlite_master WHERE type='table' AND name='schema_version'", + [], + |row| row.get(0), + ) + .unwrap_or(false); + + if !has_version_table { + return 0; + } + + conn.query_row( + "SELECT COALESCE(MAX(version), 0) FROM schema_version", + [], + |row| row.get(0), + ) + .unwrap_or(0) +} diff --git a/src/core/error.rs b/src/core/error.rs new file mode 100644 index 0000000..41ffa36 --- /dev/null +++ b/src/core/error.rs @@ -0,0 +1,137 @@ +//! Custom error types for gitlab-inbox. +//! +//! Uses thiserror for ergonomic error definitions with structured error codes. + +use thiserror::Error; + +/// Error codes for programmatic error handling. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ErrorCode { + ConfigNotFound, + ConfigInvalid, + GitLabAuthFailed, + GitLabNotFound, + GitLabRateLimited, + GitLabNetworkError, + DatabaseLocked, + DatabaseError, + MigrationFailed, + TokenNotSet, + TransformError, + IoError, + InternalError, +} + +impl std::fmt::Display for ErrorCode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let code = match self { + Self::ConfigNotFound => "CONFIG_NOT_FOUND", + Self::ConfigInvalid => "CONFIG_INVALID", + Self::GitLabAuthFailed => "GITLAB_AUTH_FAILED", + Self::GitLabNotFound => "GITLAB_NOT_FOUND", + Self::GitLabRateLimited => "GITLAB_RATE_LIMITED", + Self::GitLabNetworkError => "GITLAB_NETWORK_ERROR", + Self::DatabaseLocked => "DB_LOCKED", + Self::DatabaseError => "DB_ERROR", + Self::MigrationFailed => "MIGRATION_FAILED", + Self::TokenNotSet => "TOKEN_NOT_SET", + Self::TransformError => "TRANSFORM_ERROR", + Self::IoError => "IO_ERROR", + Self::InternalError => "INTERNAL_ERROR", + }; + write!(f, "{code}") + } +} + +/// Main error type for gitlab-inbox. +#[derive(Error, Debug)] +pub enum GiError { + #[error("Config file not found at {path}. Run \"gi init\" first.")] + ConfigNotFound { path: String }, + + #[error("Invalid config: {details}")] + ConfigInvalid { details: String }, + + #[error("GitLab authentication failed. Check your token has read_api scope.")] + GitLabAuthFailed, + + #[error("GitLab resource not found: {resource}")] + GitLabNotFound { resource: String }, + + #[error("Rate limited. Retry after {retry_after}s")] + GitLabRateLimited { retry_after: u64 }, + + #[error("Cannot connect to GitLab at {base_url}")] + GitLabNetworkError { + base_url: String, + #[source] + source: Option, + }, + + #[error( + "Another sync is running (owner: {owner}, started: {started_at}). Use --force to override if stale." + )] + DatabaseLocked { owner: String, started_at: String }, + + #[error("Migration {version} failed: {message}")] + MigrationFailed { + version: i32, + message: String, + #[source] + source: Option, + }, + + #[error("GitLab token not set. Export {env_var} environment variable.")] + TokenNotSet { env_var: String }, + + #[error("Database error: {0}")] + Database(#[from] rusqlite::Error), + + #[error("HTTP error: {0}")] + Http(#[from] reqwest::Error), + + #[error("JSON error: {0}")] + Json(#[from] serde_json::Error), + + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + + #[error("Transform error: {0}")] + Transform(#[from] crate::gitlab::transformers::issue::TransformError), + + #[error("Not found: {0}")] + NotFound(String), + + #[error("Ambiguous: {0}")] + Ambiguous(String), + + #[error("{0}")] + Other(String), +} + +impl GiError { + /// Get the error code for programmatic handling. + pub fn code(&self) -> ErrorCode { + match self { + Self::ConfigNotFound { .. } => ErrorCode::ConfigNotFound, + Self::ConfigInvalid { .. } => ErrorCode::ConfigInvalid, + Self::GitLabAuthFailed => ErrorCode::GitLabAuthFailed, + Self::GitLabNotFound { .. } => ErrorCode::GitLabNotFound, + Self::GitLabRateLimited { .. } => ErrorCode::GitLabRateLimited, + Self::GitLabNetworkError { .. } => ErrorCode::GitLabNetworkError, + Self::DatabaseLocked { .. } => ErrorCode::DatabaseLocked, + Self::MigrationFailed { .. } => ErrorCode::MigrationFailed, + Self::TokenNotSet { .. } => ErrorCode::TokenNotSet, + Self::Database(_) => ErrorCode::DatabaseError, + Self::Http(_) => ErrorCode::GitLabNetworkError, + Self::Json(_) => ErrorCode::InternalError, + Self::Io(_) => ErrorCode::IoError, + Self::Transform(_) => ErrorCode::TransformError, + Self::NotFound(_) => ErrorCode::GitLabNotFound, + Self::Ambiguous(_) => ErrorCode::InternalError, + Self::Other(_) => ErrorCode::InternalError, + } + } +} + +pub type Result = std::result::Result; diff --git a/src/core/lock.rs b/src/core/lock.rs new file mode 100644 index 0000000..c310275 --- /dev/null +++ b/src/core/lock.rs @@ -0,0 +1,233 @@ +//! Crash-safe single-flight lock using heartbeat pattern. +//! +//! Prevents concurrent sync operations and allows recovery from crashed processes. + +use rusqlite::{Connection, TransactionBehavior}; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; +use std::thread; +use std::time::Duration; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +use super::db::create_connection; +use super::error::{GiError, Result}; +use super::time::{ms_to_iso, now_ms}; + +/// Maximum consecutive heartbeat failures before signaling error. +const MAX_HEARTBEAT_FAILURES: u32 = 3; + +/// Lock configuration options. +pub struct LockOptions { + pub name: String, + pub stale_lock_minutes: u32, + pub heartbeat_interval_seconds: u32, +} + +/// App lock with heartbeat for crash recovery. +pub struct AppLock { + conn: Connection, + db_path: PathBuf, + owner: String, + name: String, + stale_lock_ms: i64, + heartbeat_interval_ms: u64, + released: Arc, + heartbeat_failed: Arc, + heartbeat_failure_count: Arc, + heartbeat_handle: Option>, +} + +impl AppLock { + /// Create a new app lock instance. + pub fn new(conn: Connection, options: LockOptions) -> Self { + let db_path = conn + .path() + .map(PathBuf::from) + .unwrap_or_default(); + + Self { + conn, + db_path, + owner: Uuid::new_v4().to_string(), + name: options.name, + stale_lock_ms: (options.stale_lock_minutes as i64) * 60 * 1000, + heartbeat_interval_ms: (options.heartbeat_interval_seconds as u64) * 1000, + released: Arc::new(AtomicBool::new(false)), + heartbeat_failed: Arc::new(AtomicBool::new(false)), + heartbeat_failure_count: Arc::new(AtomicU32::new(0)), + heartbeat_handle: None, + } + } + + /// Check if heartbeat has failed (indicates lock may be compromised). + pub fn is_heartbeat_healthy(&self) -> bool { + !self.heartbeat_failed.load(Ordering::SeqCst) + } + + /// Attempt to acquire the lock atomically. + /// + /// Returns Ok(true) if lock acquired, Err if lock is held by another active process. + pub fn acquire(&mut self, force: bool) -> Result { + let now = now_ms(); + + // Use IMMEDIATE transaction to prevent race conditions + let tx = self.conn.transaction_with_behavior(TransactionBehavior::Immediate)?; + + // Check for existing lock within the transaction + let existing: Option<(String, i64, i64)> = tx + .query_row( + "SELECT owner, acquired_at, heartbeat_at FROM app_locks WHERE name = ?", + [&self.name], + |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), + ) + .ok(); + + match existing { + None => { + // No lock exists, acquire it + tx.execute( + "INSERT INTO app_locks (name, owner, acquired_at, heartbeat_at) VALUES (?, ?, ?, ?)", + (&self.name, &self.owner, now, now), + )?; + info!(owner = %self.owner, "Lock acquired (new)"); + } + Some((existing_owner, acquired_at, heartbeat_at)) => { + let is_stale = now - heartbeat_at > self.stale_lock_ms; + + if is_stale || force { + // Lock is stale or force override, take it + tx.execute( + "UPDATE app_locks SET owner = ?, acquired_at = ?, heartbeat_at = ? WHERE name = ?", + (&self.owner, now, now, &self.name), + )?; + info!( + owner = %self.owner, + previous_owner = %existing_owner, + was_stale = is_stale, + "Lock acquired (override)" + ); + } else if existing_owner == self.owner { + // Re-entrant, update heartbeat + tx.execute( + "UPDATE app_locks SET heartbeat_at = ? WHERE name = ?", + (now, &self.name), + )?; + } else { + // Lock held by another active process - rollback and return error + drop(tx); + return Err(GiError::DatabaseLocked { + owner: existing_owner, + started_at: ms_to_iso(acquired_at), + }); + } + } + } + + // Commit the transaction atomically + tx.commit()?; + + self.start_heartbeat(); + Ok(true) + } + + /// Release the lock. + pub fn release(&mut self) { + if self.released.swap(true, Ordering::SeqCst) { + return; // Already released + } + + // Stop heartbeat thread + if let Some(handle) = self.heartbeat_handle.take() { + let _ = handle.join(); + } + + let _ = self.conn.execute( + "DELETE FROM app_locks WHERE name = ? AND owner = ?", + (&self.name, &self.owner), + ); + + info!(owner = %self.owner, "Lock released"); + } + + /// Start the heartbeat thread to keep the lock alive. + fn start_heartbeat(&mut self) { + let name = self.name.clone(); + let owner = self.owner.clone(); + let interval = Duration::from_millis(self.heartbeat_interval_ms); + let released = Arc::clone(&self.released); + let heartbeat_failed = Arc::clone(&self.heartbeat_failed); + let failure_count = Arc::clone(&self.heartbeat_failure_count); + let db_path = self.db_path.clone(); + + if db_path.as_os_str().is_empty() { + return; // In-memory database, skip heartbeat + } + + self.heartbeat_handle = Some(thread::spawn(move || { + // Open a new connection with proper pragmas + let conn = match create_connection(&db_path) { + Ok(c) => c, + Err(e) => { + error!(error = %e, "Failed to create heartbeat connection"); + heartbeat_failed.store(true, Ordering::SeqCst); + return; + } + }; + + loop { + thread::sleep(interval); + + if released.load(Ordering::SeqCst) { + break; + } + + let now = now_ms(); + let result = conn.execute( + "UPDATE app_locks SET heartbeat_at = ? WHERE name = ? AND owner = ?", + (now, &name, &owner), + ); + + match result { + Ok(rows_affected) => { + if rows_affected == 0 { + // Lock was stolen or deleted + warn!(owner = %owner, "Heartbeat failed: lock no longer held"); + heartbeat_failed.store(true, Ordering::SeqCst); + break; + } + // Reset failure count on success + failure_count.store(0, Ordering::SeqCst); + debug!(owner = %owner, "Heartbeat updated"); + } + Err(e) => { + let count = failure_count.fetch_add(1, Ordering::SeqCst) + 1; + warn!( + owner = %owner, + error = %e, + consecutive_failures = count, + "Heartbeat update failed" + ); + + if count >= MAX_HEARTBEAT_FAILURES { + error!( + owner = %owner, + "Heartbeat failed {} times consecutively, signaling failure", + MAX_HEARTBEAT_FAILURES + ); + heartbeat_failed.store(true, Ordering::SeqCst); + break; + } + } + } + } + })); + } +} + +impl Drop for AppLock { + fn drop(&mut self) { + self.release(); + } +} diff --git a/src/core/mod.rs b/src/core/mod.rs new file mode 100644 index 0000000..22dccd5 --- /dev/null +++ b/src/core/mod.rs @@ -0,0 +1,12 @@ +//! Core infrastructure modules. + +pub mod config; +pub mod db; +pub mod error; +pub mod lock; +pub mod paths; +pub mod payloads; +pub mod time; + +pub use config::Config; +pub use error::{GiError, Result}; diff --git a/src/core/paths.rs b/src/core/paths.rs new file mode 100644 index 0000000..d8e6064 --- /dev/null +++ b/src/core/paths.rs @@ -0,0 +1,100 @@ +//! XDG-compliant path resolution for config and data directories. + +use std::path::PathBuf; + +/// Get the path to the config file. +/// +/// Resolution order: +/// 1. CLI flag override (if provided) +/// 2. GI_CONFIG_PATH environment variable +/// 3. XDG default (~/.config/gi/config.json) +/// 4. Local fallback (./gi.config.json) if exists +/// 5. Returns XDG default even if not exists +pub fn get_config_path(cli_override: Option<&str>) -> PathBuf { + // 1. CLI flag override + if let Some(path) = cli_override { + return PathBuf::from(path); + } + + // 2. Environment variable + if let Ok(path) = std::env::var("GI_CONFIG_PATH") { + return PathBuf::from(path); + } + + // 3. XDG default + let xdg_path = get_xdg_config_dir().join("gi").join("config.json"); + if xdg_path.exists() { + return xdg_path; + } + + // 4. Local fallback (for development) + let local_path = PathBuf::from("gi.config.json"); + if local_path.exists() { + return local_path; + } + + // 5. Return XDG path (will trigger not-found error if missing) + xdg_path +} + +/// Get the data directory path. +/// Uses XDG_DATA_HOME or defaults to ~/.local/share/gi +pub fn get_data_dir() -> PathBuf { + get_xdg_data_dir().join("gi") +} + +/// Get the database file path. +/// Uses config override if provided, otherwise uses default in data dir. +pub fn get_db_path(config_override: Option<&str>) -> PathBuf { + if let Some(path) = config_override { + return PathBuf::from(path); + } + get_data_dir().join("gi.db") +} + +/// Get the backup directory path. +/// Uses config override if provided, otherwise uses default in data dir. +pub fn get_backup_dir(config_override: Option<&str>) -> PathBuf { + if let Some(path) = config_override { + return PathBuf::from(path); + } + get_data_dir().join("backups") +} + +/// Get XDG config directory, falling back to ~/.config +fn get_xdg_config_dir() -> PathBuf { + std::env::var("XDG_CONFIG_HOME") + .map(PathBuf::from) + .unwrap_or_else(|_| { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".config") + }) +} + +/// Get XDG data directory, falling back to ~/.local/share +fn get_xdg_data_dir() -> PathBuf { + std::env::var("XDG_DATA_HOME") + .map(PathBuf::from) + .unwrap_or_else(|_| { + dirs::home_dir() + .unwrap_or_else(|| PathBuf::from(".")) + .join(".local") + .join("share") + }) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_cli_override_takes_precedence() { + let path = get_config_path(Some("/custom/path.json")); + assert_eq!(path, PathBuf::from("/custom/path.json")); + } + + // Note: env var tests removed - mutating process-global env vars + // in parallel tests is unsafe in Rust 2024. The env var code path + // is trivial (std::env::var) and doesn't warrant the complexity. +} diff --git a/src/core/payloads.rs b/src/core/payloads.rs new file mode 100644 index 0000000..328e7dd --- /dev/null +++ b/src/core/payloads.rs @@ -0,0 +1,215 @@ +//! Raw payload storage with optional compression and deduplication. + +use flate2::Compression; +use flate2::read::GzDecoder; +use flate2::write::GzEncoder; +use rusqlite::Connection; +use sha2::{Digest, Sha256}; +use std::io::{Read, Write}; + +use super::error::Result; +use super::time::now_ms; + +/// Options for storing a payload. +pub struct StorePayloadOptions<'a> { + pub project_id: Option, + pub resource_type: &'a str, // 'project' | 'issue' | 'mr' | 'note' | 'discussion' + pub gitlab_id: &'a str, // TEXT because discussion IDs are strings + pub payload: &'a serde_json::Value, + pub compress: bool, +} + +/// Store a raw API payload with optional compression and deduplication. +/// Returns the row ID (either new or existing if duplicate). +pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result { + // 1. JSON stringify the payload + let json_bytes = serde_json::to_vec(options.payload)?; + + // 2. SHA-256 hash the JSON bytes (pre-compression) + let mut hasher = Sha256::new(); + hasher.update(&json_bytes); + let payload_hash = format!("{:x}", hasher.finalize()); + + // 3. Check for duplicate by (project_id, resource_type, gitlab_id, payload_hash) + let existing: Option = conn + .query_row( + "SELECT id FROM raw_payloads + WHERE project_id IS ? AND resource_type = ? AND gitlab_id = ? AND payload_hash = ?", + ( + options.project_id, + options.resource_type, + options.gitlab_id, + &payload_hash, + ), + |row| row.get(0), + ) + .ok(); + + // 4. If duplicate, return existing ID + if let Some(id) = existing { + return Ok(id); + } + + // 5. Compress if requested + let (encoding, payload_bytes) = if options.compress { + let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&json_bytes)?; + ("gzip", encoder.finish()?) + } else { + ("identity", json_bytes) + }; + + // 6. INSERT with content_encoding + conn.execute( + "INSERT INTO raw_payloads + (source, project_id, resource_type, gitlab_id, fetched_at, content_encoding, payload_hash, payload) + VALUES ('gitlab', ?, ?, ?, ?, ?, ?, ?)", + ( + options.project_id, + options.resource_type, + options.gitlab_id, + now_ms(), + encoding, + &payload_hash, + &payload_bytes, + ), + )?; + + Ok(conn.last_insert_rowid()) +} + +/// Read a raw payload by ID, decompressing if necessary. +/// Returns None if not found. +pub fn read_payload(conn: &Connection, id: i64) -> Result> { + let row: Option<(String, Vec)> = conn + .query_row( + "SELECT content_encoding, payload FROM raw_payloads WHERE id = ?", + [id], + |row| Ok((row.get(0)?, row.get(1)?)), + ) + .ok(); + + let Some((encoding, payload_bytes)) = row else { + return Ok(None); + }; + + // Decompress if needed + let json_bytes = if encoding == "gzip" { + let mut decoder = GzDecoder::new(&payload_bytes[..]); + let mut decompressed = Vec::new(); + decoder.read_to_end(&mut decompressed)?; + decompressed + } else { + payload_bytes + }; + + let value: serde_json::Value = serde_json::from_slice(&json_bytes)?; + Ok(Some(value)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::core::db::create_connection; + use tempfile::tempdir; + + fn setup_test_db() -> Connection { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test.db"); + let conn = create_connection(&db_path).unwrap(); + + // Create minimal schema for testing + conn.execute_batch( + "CREATE TABLE raw_payloads ( + id INTEGER PRIMARY KEY, + source TEXT NOT NULL, + project_id INTEGER, + resource_type TEXT NOT NULL, + gitlab_id TEXT NOT NULL, + fetched_at INTEGER NOT NULL, + content_encoding TEXT NOT NULL DEFAULT 'identity', + payload_hash TEXT NOT NULL, + payload BLOB NOT NULL + ); + CREATE UNIQUE INDEX uq_raw_payloads_dedupe + ON raw_payloads(project_id, resource_type, gitlab_id, payload_hash);", + ) + .unwrap(); + + conn + } + + #[test] + fn test_store_and_read_payload() { + let conn = setup_test_db(); + let payload = serde_json::json!({"title": "Test Issue", "id": 123}); + + let id = store_payload( + &conn, + StorePayloadOptions { + project_id: Some(1), + resource_type: "issue", + gitlab_id: "123", + payload: &payload, + compress: false, + }, + ) + .unwrap(); + + let result = read_payload(&conn, id).unwrap().unwrap(); + assert_eq!(result["title"], "Test Issue"); + } + + #[test] + fn test_compression_roundtrip() { + let conn = setup_test_db(); + let payload = serde_json::json!({"data": "x".repeat(1000)}); + + let id = store_payload( + &conn, + StorePayloadOptions { + project_id: Some(1), + resource_type: "issue", + gitlab_id: "456", + payload: &payload, + compress: true, + }, + ) + .unwrap(); + + let result = read_payload(&conn, id).unwrap().unwrap(); + assert_eq!(result["data"], "x".repeat(1000)); + } + + #[test] + fn test_deduplication() { + let conn = setup_test_db(); + let payload = serde_json::json!({"id": 789}); + + let id1 = store_payload( + &conn, + StorePayloadOptions { + project_id: Some(1), + resource_type: "issue", + gitlab_id: "789", + payload: &payload, + compress: false, + }, + ) + .unwrap(); + + let id2 = store_payload( + &conn, + StorePayloadOptions { + project_id: Some(1), + resource_type: "issue", + gitlab_id: "789", + payload: &payload, + compress: false, + }, + ) + .unwrap(); + + assert_eq!(id1, id2); // Same payload returns same ID + } +} diff --git a/src/core/time.rs b/src/core/time.rs new file mode 100644 index 0000000..376ea2d --- /dev/null +++ b/src/core/time.rs @@ -0,0 +1,136 @@ +//! Time utilities for consistent timestamp handling. +//! +//! All database *_at columns use milliseconds since epoch for consistency. + +use chrono::{DateTime, Utc}; + +/// Convert GitLab API ISO 8601 timestamp to milliseconds since epoch. +pub fn iso_to_ms(iso_string: &str) -> Option { + DateTime::parse_from_rfc3339(iso_string) + .ok() + .map(|dt| dt.timestamp_millis()) +} + +/// Convert milliseconds since epoch to ISO 8601 string. +pub fn ms_to_iso(ms: i64) -> String { + DateTime::from_timestamp_millis(ms) + .map(|dt| dt.to_rfc3339()) + .unwrap_or_else(|| "Invalid timestamp".to_string()) +} + +/// Get current time in milliseconds since epoch. +pub fn now_ms() -> i64 { + Utc::now().timestamp_millis() +} + +/// Parse a relative time string (7d, 2w, 1m) or ISO date into ms epoch. +/// +/// Returns the timestamp as of which to filter (cutoff point). +/// - `7d` = 7 days ago +/// - `2w` = 2 weeks ago +/// - `1m` = 1 month ago (30 days) +/// - `2024-01-15` = midnight UTC on that date +pub fn parse_since(input: &str) -> Option { + let input = input.trim(); + + // Try relative format: Nd, Nw, Nm + if let Some(num_str) = input.strip_suffix('d') { + let days: i64 = num_str.parse().ok()?; + return Some(now_ms() - (days * 24 * 60 * 60 * 1000)); + } + + if let Some(num_str) = input.strip_suffix('w') { + let weeks: i64 = num_str.parse().ok()?; + return Some(now_ms() - (weeks * 7 * 24 * 60 * 60 * 1000)); + } + + if let Some(num_str) = input.strip_suffix('m') { + let months: i64 = num_str.parse().ok()?; + return Some(now_ms() - (months * 30 * 24 * 60 * 60 * 1000)); + } + + // Try ISO date: YYYY-MM-DD + if input.len() == 10 && input.chars().filter(|&c| c == '-').count() == 2 { + let iso_full = format!("{input}T00:00:00Z"); + return iso_to_ms(&iso_full); + } + + // Try full ISO 8601 + iso_to_ms(input) +} + +/// Format milliseconds epoch to human-readable full datetime. +pub fn format_full_datetime(ms: i64) -> String { + DateTime::from_timestamp_millis(ms) + .map(|dt| dt.format("%Y-%m-%d %H:%M UTC").to_string()) + .unwrap_or_else(|| "Unknown".to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_iso_to_ms() { + let ms = iso_to_ms("2024-01-15T10:30:00Z").unwrap(); + assert!(ms > 0); + } + + #[test] + fn test_ms_to_iso() { + let iso = ms_to_iso(1705315800000); + assert!(iso.contains("2024-01-15")); + } + + #[test] + fn test_now_ms() { + let now = now_ms(); + assert!(now > 1700000000000); // After 2023 + } + + #[test] + fn test_parse_since_days() { + let now = now_ms(); + let seven_days = parse_since("7d").unwrap(); + let expected = now - (7 * 24 * 60 * 60 * 1000); + assert!((seven_days - expected).abs() < 1000); // Within 1 second + } + + #[test] + fn test_parse_since_weeks() { + let now = now_ms(); + let two_weeks = parse_since("2w").unwrap(); + let expected = now - (14 * 24 * 60 * 60 * 1000); + assert!((two_weeks - expected).abs() < 1000); + } + + #[test] + fn test_parse_since_months() { + let now = now_ms(); + let one_month = parse_since("1m").unwrap(); + let expected = now - (30 * 24 * 60 * 60 * 1000); + assert!((one_month - expected).abs() < 1000); + } + + #[test] + fn test_parse_since_iso_date() { + let ms = parse_since("2024-01-15").unwrap(); + assert!(ms > 0); + // Should be midnight UTC on that date + let expected = iso_to_ms("2024-01-15T00:00:00Z").unwrap(); + assert_eq!(ms, expected); + } + + #[test] + fn test_parse_since_invalid() { + assert!(parse_since("invalid").is_none()); + assert!(parse_since("").is_none()); + } + + #[test] + fn test_format_full_datetime() { + let dt = format_full_datetime(1705315800000); + assert!(dt.contains("2024-01-15")); + assert!(dt.contains("UTC")); + } +}