Files
gitlore/src/core/lock.rs
Taylor Eernisse a7f86b26e4 refactor(core): compact human log format, quieter lock lifecycle, nonzero_summary helper
Three quality-of-life improvements to reduce log noise and improve readability:

1. logging.rs: Add CompactHumanFormat for stderr tracing output. Replaces the
   default format with a minimal 'HH:MM:SS LEVEL  message key=value' layout —
   no span context, no full timestamps, no target module. The JSON file log
   layer is unaffected. This makes watching 'lore sync' output much cleaner.

2. lock.rs: Downgrade AppLock acquire/release messages from info! to debug!.
   Lock lifecycle events (acquired new, acquired existing, released) are
   operational bookkeeping that clutters normal output. They remain visible
   at -vv verbosity for troubleshooting.

3. ingestion/mod.rs: Add nonzero_summary() utility that formats named counters
   as a compact middle-dot-separated string, suppressing zero values. Produces
   output like '42 fetched · 3 labels · 12 notes' instead of verbose key=value
   structured fields. Returns 'nothing to update' when all values are zero.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-13 22:31:30 -05:00

229 lines
7.6 KiB
Rust

use rusqlite::{Connection, TransactionBehavior};
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::thread;
use std::time::Duration;
use tracing::{debug, error, warn};
use uuid::Uuid;
use super::db::create_connection;
use super::error::{LoreError, Result};
use super::time::{ms_to_iso, now_ms};
const MAX_HEARTBEAT_FAILURES: u32 = 3;
pub struct LockOptions {
pub name: String,
pub stale_lock_minutes: u32,
pub heartbeat_interval_seconds: u32,
}
pub struct AppLock {
conn: Connection,
db_path: PathBuf,
owner: String,
name: String,
stale_lock_ms: i64,
heartbeat_interval_ms: u64,
released: Arc<AtomicBool>,
heartbeat_failed: Arc<AtomicBool>,
heartbeat_failure_count: Arc<AtomicU32>,
heartbeat_handle: Option<thread::JoinHandle<()>>,
}
impl AppLock {
pub fn new(conn: Connection, options: LockOptions) -> Self {
let db_path = conn.path().map(PathBuf::from).unwrap_or_default();
Self {
conn,
db_path,
owner: Uuid::new_v4().to_string(),
name: options.name,
stale_lock_ms: (options.stale_lock_minutes as i64) * 60 * 1000,
heartbeat_interval_ms: (options.heartbeat_interval_seconds as u64) * 1000,
released: Arc::new(AtomicBool::new(false)),
heartbeat_failed: Arc::new(AtomicBool::new(false)),
heartbeat_failure_count: Arc::new(AtomicU32::new(0)),
heartbeat_handle: None,
}
}
pub fn is_heartbeat_healthy(&self) -> bool {
!self.heartbeat_failed.load(Ordering::SeqCst)
}
pub fn acquire(&mut self, force: bool) -> Result<bool> {
let now = now_ms();
let tx = self
.conn
.transaction_with_behavior(TransactionBehavior::Immediate)?;
let existing: Option<(String, i64, i64)> = tx
.query_row(
"SELECT owner, acquired_at, heartbeat_at FROM app_locks WHERE name = ?",
[&self.name],
|row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)),
)
.ok();
match existing {
None => {
tx.execute(
"INSERT INTO app_locks (name, owner, acquired_at, heartbeat_at) VALUES (?, ?, ?, ?)",
(&self.name, &self.owner, now, now),
)?;
debug!(owner = %self.owner, "Lock acquired (new)");
}
Some((existing_owner, acquired_at, heartbeat_at)) => {
let is_stale = now - heartbeat_at > self.stale_lock_ms;
if is_stale || force {
tx.execute(
"UPDATE app_locks SET owner = ?, acquired_at = ?, heartbeat_at = ? WHERE name = ?",
(&self.owner, now, now, &self.name),
)?;
debug!(
owner = %self.owner,
previous_owner = %existing_owner,
was_stale = is_stale,
"Lock acquired (override)"
);
} else if existing_owner == self.owner {
tx.execute(
"UPDATE app_locks SET heartbeat_at = ? WHERE name = ?",
(now, &self.name),
)?;
} else {
drop(tx);
return Err(LoreError::DatabaseLocked {
owner: existing_owner,
started_at: ms_to_iso(acquired_at),
});
}
}
}
tx.commit()?;
self.start_heartbeat();
Ok(true)
}
pub fn release(&mut self) {
if self.released.swap(true, Ordering::SeqCst) {
return;
}
if let Some(handle) = self.heartbeat_handle.take() {
let _ = handle.join();
}
match self.conn.execute(
"DELETE FROM app_locks WHERE name = ? AND owner = ?",
(&self.name, &self.owner),
) {
Ok(_) => debug!(owner = %self.owner, "Lock released"),
Err(e) => error!(
owner = %self.owner,
error = %e,
"Failed to release lock; may require --force on next sync"
),
}
}
fn start_heartbeat(&mut self) {
// Stop any existing heartbeat thread before starting a new one
if let Some(handle) = self.heartbeat_handle.take() {
self.released.store(true, Ordering::SeqCst);
let _ = handle.join();
self.released.store(false, Ordering::SeqCst);
}
let name = self.name.clone();
let owner = self.owner.clone();
let interval = Duration::from_millis(self.heartbeat_interval_ms);
let released = Arc::clone(&self.released);
let heartbeat_failed = Arc::clone(&self.heartbeat_failed);
let failure_count = Arc::clone(&self.heartbeat_failure_count);
let db_path = self.db_path.clone();
if db_path.as_os_str().is_empty() {
return;
}
self.heartbeat_handle = Some(thread::spawn(move || {
let conn = match create_connection(&db_path) {
Ok(c) => c,
Err(e) => {
error!(error = %e, "Failed to create heartbeat connection");
heartbeat_failed.store(true, Ordering::SeqCst);
return;
}
};
const POLL_INTERVAL: Duration = Duration::from_millis(100);
loop {
let mut elapsed = Duration::ZERO;
while elapsed < interval {
thread::sleep(POLL_INTERVAL);
elapsed += POLL_INTERVAL;
if released.load(Ordering::SeqCst) {
return;
}
}
if released.load(Ordering::SeqCst) {
break;
}
let now = now_ms();
let result = conn.execute(
"UPDATE app_locks SET heartbeat_at = ? WHERE name = ? AND owner = ?",
(now, &name, &owner),
);
match result {
Ok(rows_affected) => {
if rows_affected == 0 {
warn!(owner = %owner, "Heartbeat failed: lock no longer held");
heartbeat_failed.store(true, Ordering::SeqCst);
break;
}
failure_count.store(0, Ordering::SeqCst);
debug!(owner = %owner, "Heartbeat updated");
}
Err(e) => {
let count = failure_count.fetch_add(1, Ordering::SeqCst) + 1;
warn!(
owner = %owner,
error = %e,
consecutive_failures = count,
"Heartbeat update failed"
);
if count >= MAX_HEARTBEAT_FAILURES {
error!(
owner = %owner,
"Heartbeat failed {} times consecutively, signaling failure",
MAX_HEARTBEAT_FAILURES
);
heartbeat_failed.store(true, Ordering::SeqCst);
break;
}
}
}
}
}));
}
}
impl Drop for AppLock {
fn drop(&mut self) {
self.release();
}
}