use rusqlite::{Connection, TransactionBehavior}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU32, Ordering}; use std::thread; use std::time::Duration; use tracing::{debug, error, info, warn}; use uuid::Uuid; use super::db::create_connection; use super::error::{LoreError, Result}; use super::time::{ms_to_iso, now_ms}; const MAX_HEARTBEAT_FAILURES: u32 = 3; pub struct LockOptions { pub name: String, pub stale_lock_minutes: u32, pub heartbeat_interval_seconds: u32, } pub struct AppLock { conn: Connection, db_path: PathBuf, owner: String, name: String, stale_lock_ms: i64, heartbeat_interval_ms: u64, released: Arc, heartbeat_failed: Arc, heartbeat_failure_count: Arc, heartbeat_handle: Option>, } impl AppLock { pub fn new(conn: Connection, options: LockOptions) -> Self { let db_path = conn.path().map(PathBuf::from).unwrap_or_default(); Self { conn, db_path, owner: Uuid::new_v4().to_string(), name: options.name, stale_lock_ms: (options.stale_lock_minutes as i64) * 60 * 1000, heartbeat_interval_ms: (options.heartbeat_interval_seconds as u64) * 1000, released: Arc::new(AtomicBool::new(false)), heartbeat_failed: Arc::new(AtomicBool::new(false)), heartbeat_failure_count: Arc::new(AtomicU32::new(0)), heartbeat_handle: None, } } pub fn is_heartbeat_healthy(&self) -> bool { !self.heartbeat_failed.load(Ordering::SeqCst) } pub fn acquire(&mut self, force: bool) -> Result { let now = now_ms(); let tx = self .conn .transaction_with_behavior(TransactionBehavior::Immediate)?; let existing: Option<(String, i64, i64)> = tx .query_row( "SELECT owner, acquired_at, heartbeat_at FROM app_locks WHERE name = ?", [&self.name], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)), ) .ok(); match existing { None => { tx.execute( "INSERT INTO app_locks (name, owner, acquired_at, heartbeat_at) VALUES (?, ?, ?, ?)", (&self.name, &self.owner, now, now), )?; info!(owner = %self.owner, "Lock acquired (new)"); } Some((existing_owner, acquired_at, heartbeat_at)) => { let is_stale = now - heartbeat_at > self.stale_lock_ms; if is_stale || force { tx.execute( "UPDATE app_locks SET owner = ?, acquired_at = ?, heartbeat_at = ? WHERE name = ?", (&self.owner, now, now, &self.name), )?; info!( owner = %self.owner, previous_owner = %existing_owner, was_stale = is_stale, "Lock acquired (override)" ); } else if existing_owner == self.owner { tx.execute( "UPDATE app_locks SET heartbeat_at = ? WHERE name = ?", (now, &self.name), )?; } else { drop(tx); return Err(LoreError::DatabaseLocked { owner: existing_owner, started_at: ms_to_iso(acquired_at), }); } } } tx.commit()?; self.start_heartbeat(); Ok(true) } pub fn release(&mut self) { if self.released.swap(true, Ordering::SeqCst) { return; } if let Some(handle) = self.heartbeat_handle.take() { let _ = handle.join(); } let _ = self.conn.execute( "DELETE FROM app_locks WHERE name = ? AND owner = ?", (&self.name, &self.owner), ); info!(owner = %self.owner, "Lock released"); } fn start_heartbeat(&mut self) { let name = self.name.clone(); let owner = self.owner.clone(); let interval = Duration::from_millis(self.heartbeat_interval_ms); let released = Arc::clone(&self.released); let heartbeat_failed = Arc::clone(&self.heartbeat_failed); let failure_count = Arc::clone(&self.heartbeat_failure_count); let db_path = self.db_path.clone(); if db_path.as_os_str().is_empty() { return; } self.heartbeat_handle = Some(thread::spawn(move || { let conn = match create_connection(&db_path) { Ok(c) => c, Err(e) => { error!(error = %e, "Failed to create heartbeat connection"); heartbeat_failed.store(true, Ordering::SeqCst); return; } }; const POLL_INTERVAL: Duration = Duration::from_millis(100); loop { let mut elapsed = Duration::ZERO; while elapsed < interval { thread::sleep(POLL_INTERVAL); elapsed += POLL_INTERVAL; if released.load(Ordering::SeqCst) { return; } } if released.load(Ordering::SeqCst) { break; } let now = now_ms(); let result = conn.execute( "UPDATE app_locks SET heartbeat_at = ? WHERE name = ? AND owner = ?", (now, &name, &owner), ); match result { Ok(rows_affected) => { if rows_affected == 0 { warn!(owner = %owner, "Heartbeat failed: lock no longer held"); heartbeat_failed.store(true, Ordering::SeqCst); break; } failure_count.store(0, Ordering::SeqCst); debug!(owner = %owner, "Heartbeat updated"); } Err(e) => { let count = failure_count.fetch_add(1, Ordering::SeqCst) + 1; warn!( owner = %owner, error = %e, consecutive_failures = count, "Heartbeat update failed" ); if count >= MAX_HEARTBEAT_FAILURES { error!( owner = %owner, "Heartbeat failed {} times consecutively, signaling failure", MAX_HEARTBEAT_FAILURES ); heartbeat_failed.store(true, Ordering::SeqCst); break; } } } } })); } } impl Drop for AppLock { fn drop(&mut self) { self.release(); } }