feat(core): Implement infrastructure layer for CLI operations
Establishes foundational modules that all other components depend on. src/core/config.rs - Configuration management: - JSON-based config file with Zod-like validation via serde - GitLab settings: base URL, token environment variable - Project list with paths to track - Sync settings: backfill days, stale lock timeout, cursor rewind - Storage settings: database path, payload compression toggle - XDG-compliant config path resolution via dirs crate - Loads GITLAB_TOKEN from configured environment variable src/core/db.rs - Database connection and migrations: - Opens or creates SQLite database with WAL mode for concurrency - Embeds migration SQL as const strings (001-005) - Runs migrations idempotently with checksum verification - Provides thread-safe connection management src/core/error.rs - Unified error handling: - GiError enum with variants for all failure modes - Config, Database, GitLab, Ingestion, Lock, IO, Parse errors - thiserror derive for automatic Display/Error impls - Result type alias for ergonomic error propagation src/core/lock.rs - Distributed sync locking: - File-based locks to prevent concurrent syncs - Stale lock detection with configurable timeout - Force override for recovery scenarios - Lock file contains PID and timestamp for debugging src/core/paths.rs - Path resolution: - XDG Base Directory Specification compliance - Config: ~/.config/gi/config.json - Data: ~/.local/share/gi/gi.db - Creates parent directories on first access src/core/payloads.rs - Raw payload storage: - Optional gzip compression for storage efficiency - SHA-256 content addressing for deduplication - Type-prefixed keys (issue:, discussion:, note:) - Batch insert with UPSERT for idempotent ingestion src/core/time.rs - Timestamp utilities: - Relative time parsing (7d, 2w, 1m) for --since flag - ISO 8601 date parsing for absolute dates - Human-friendly relative time formatting Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
233
src/core/lock.rs
Normal file
233
src/core/lock.rs
Normal file
@@ -0,0 +1,233 @@
|
||||
//! Crash-safe single-flight lock using heartbeat pattern.
|
||||
//!
|
||||
//! Prevents concurrent sync operations and allows recovery from crashed processes.
|
||||
|
||||
use rusqlite::{Connection, TransactionBehavior};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use tracing::{debug, error, info, warn};
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::db::create_connection;
|
||||
use super::error::{GiError, Result};
|
||||
use super::time::{ms_to_iso, now_ms};
|
||||
|
||||
/// Maximum consecutive heartbeat failures before signaling error.
|
||||
const MAX_HEARTBEAT_FAILURES: u32 = 3;
|
||||
|
||||
/// Lock configuration options.
|
||||
pub struct LockOptions {
|
||||
pub name: String,
|
||||
pub stale_lock_minutes: u32,
|
||||
pub heartbeat_interval_seconds: u32,
|
||||
}
|
||||
|
||||
/// App lock with heartbeat for crash recovery.
|
||||
pub struct AppLock {
|
||||
conn: Connection,
|
||||
db_path: PathBuf,
|
||||
owner: String,
|
||||
name: String,
|
||||
stale_lock_ms: i64,
|
||||
heartbeat_interval_ms: u64,
|
||||
released: Arc<AtomicBool>,
|
||||
heartbeat_failed: Arc<AtomicBool>,
|
||||
heartbeat_failure_count: Arc<AtomicU32>,
|
||||
heartbeat_handle: Option<thread::JoinHandle<()>>,
|
||||
}
|
||||
|
||||
impl AppLock {
|
||||
/// Create a new app lock instance.
|
||||
pub fn new(conn: Connection, options: LockOptions) -> Self {
|
||||
let db_path = conn
|
||||
.path()
|
||||
.map(PathBuf::from)
|
||||
.unwrap_or_default();
|
||||
|
||||
Self {
|
||||
conn,
|
||||
db_path,
|
||||
owner: Uuid::new_v4().to_string(),
|
||||
name: options.name,
|
||||
stale_lock_ms: (options.stale_lock_minutes as i64) * 60 * 1000,
|
||||
heartbeat_interval_ms: (options.heartbeat_interval_seconds as u64) * 1000,
|
||||
released: Arc::new(AtomicBool::new(false)),
|
||||
heartbeat_failed: Arc::new(AtomicBool::new(false)),
|
||||
heartbeat_failure_count: Arc::new(AtomicU32::new(0)),
|
||||
heartbeat_handle: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if heartbeat has failed (indicates lock may be compromised).
|
||||
pub fn is_heartbeat_healthy(&self) -> bool {
|
||||
!self.heartbeat_failed.load(Ordering::SeqCst)
|
||||
}
|
||||
|
||||
/// Attempt to acquire the lock atomically.
|
||||
///
|
||||
/// Returns Ok(true) if lock acquired, Err if lock is held by another active process.
|
||||
pub fn acquire(&mut self, force: bool) -> Result<bool> {
|
||||
let now = now_ms();
|
||||
|
||||
// Use IMMEDIATE transaction to prevent race conditions
|
||||
let tx = self.conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
|
||||
|
||||
// Check for existing lock within the transaction
|
||||
let existing: Option<(String, i64, i64)> = tx
|
||||
.query_row(
|
||||
"SELECT owner, acquired_at, heartbeat_at FROM app_locks WHERE name = ?",
|
||||
[&self.name],
|
||||
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
|
||||
)
|
||||
.ok();
|
||||
|
||||
match existing {
|
||||
None => {
|
||||
// No lock exists, acquire it
|
||||
tx.execute(
|
||||
"INSERT INTO app_locks (name, owner, acquired_at, heartbeat_at) VALUES (?, ?, ?, ?)",
|
||||
(&self.name, &self.owner, now, now),
|
||||
)?;
|
||||
info!(owner = %self.owner, "Lock acquired (new)");
|
||||
}
|
||||
Some((existing_owner, acquired_at, heartbeat_at)) => {
|
||||
let is_stale = now - heartbeat_at > self.stale_lock_ms;
|
||||
|
||||
if is_stale || force {
|
||||
// Lock is stale or force override, take it
|
||||
tx.execute(
|
||||
"UPDATE app_locks SET owner = ?, acquired_at = ?, heartbeat_at = ? WHERE name = ?",
|
||||
(&self.owner, now, now, &self.name),
|
||||
)?;
|
||||
info!(
|
||||
owner = %self.owner,
|
||||
previous_owner = %existing_owner,
|
||||
was_stale = is_stale,
|
||||
"Lock acquired (override)"
|
||||
);
|
||||
} else if existing_owner == self.owner {
|
||||
// Re-entrant, update heartbeat
|
||||
tx.execute(
|
||||
"UPDATE app_locks SET heartbeat_at = ? WHERE name = ?",
|
||||
(now, &self.name),
|
||||
)?;
|
||||
} else {
|
||||
// Lock held by another active process - rollback and return error
|
||||
drop(tx);
|
||||
return Err(GiError::DatabaseLocked {
|
||||
owner: existing_owner,
|
||||
started_at: ms_to_iso(acquired_at),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Commit the transaction atomically
|
||||
tx.commit()?;
|
||||
|
||||
self.start_heartbeat();
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
/// Release the lock.
|
||||
pub fn release(&mut self) {
|
||||
if self.released.swap(true, Ordering::SeqCst) {
|
||||
return; // Already released
|
||||
}
|
||||
|
||||
// Stop heartbeat thread
|
||||
if let Some(handle) = self.heartbeat_handle.take() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
let _ = self.conn.execute(
|
||||
"DELETE FROM app_locks WHERE name = ? AND owner = ?",
|
||||
(&self.name, &self.owner),
|
||||
);
|
||||
|
||||
info!(owner = %self.owner, "Lock released");
|
||||
}
|
||||
|
||||
/// Start the heartbeat thread to keep the lock alive.
|
||||
fn start_heartbeat(&mut self) {
|
||||
let name = self.name.clone();
|
||||
let owner = self.owner.clone();
|
||||
let interval = Duration::from_millis(self.heartbeat_interval_ms);
|
||||
let released = Arc::clone(&self.released);
|
||||
let heartbeat_failed = Arc::clone(&self.heartbeat_failed);
|
||||
let failure_count = Arc::clone(&self.heartbeat_failure_count);
|
||||
let db_path = self.db_path.clone();
|
||||
|
||||
if db_path.as_os_str().is_empty() {
|
||||
return; // In-memory database, skip heartbeat
|
||||
}
|
||||
|
||||
self.heartbeat_handle = Some(thread::spawn(move || {
|
||||
// Open a new connection with proper pragmas
|
||||
let conn = match create_connection(&db_path) {
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
error!(error = %e, "Failed to create heartbeat connection");
|
||||
heartbeat_failed.store(true, Ordering::SeqCst);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
loop {
|
||||
thread::sleep(interval);
|
||||
|
||||
if released.load(Ordering::SeqCst) {
|
||||
break;
|
||||
}
|
||||
|
||||
let now = now_ms();
|
||||
let result = conn.execute(
|
||||
"UPDATE app_locks SET heartbeat_at = ? WHERE name = ? AND owner = ?",
|
||||
(now, &name, &owner),
|
||||
);
|
||||
|
||||
match result {
|
||||
Ok(rows_affected) => {
|
||||
if rows_affected == 0 {
|
||||
// Lock was stolen or deleted
|
||||
warn!(owner = %owner, "Heartbeat failed: lock no longer held");
|
||||
heartbeat_failed.store(true, Ordering::SeqCst);
|
||||
break;
|
||||
}
|
||||
// Reset failure count on success
|
||||
failure_count.store(0, Ordering::SeqCst);
|
||||
debug!(owner = %owner, "Heartbeat updated");
|
||||
}
|
||||
Err(e) => {
|
||||
let count = failure_count.fetch_add(1, Ordering::SeqCst) + 1;
|
||||
warn!(
|
||||
owner = %owner,
|
||||
error = %e,
|
||||
consecutive_failures = count,
|
||||
"Heartbeat update failed"
|
||||
);
|
||||
|
||||
if count >= MAX_HEARTBEAT_FAILURES {
|
||||
error!(
|
||||
owner = %owner,
|
||||
"Heartbeat failed {} times consecutively, signaling failure",
|
||||
MAX_HEARTBEAT_FAILURES
|
||||
);
|
||||
heartbeat_failed.store(true, Ordering::SeqCst);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for AppLock {
|
||||
fn drop(&mut self) {
|
||||
self.release();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user