From c069e0371411fe5b8494c479d9007bddd9de44e5 Mon Sep 17 00:00:00 2001 From: teernisse Date: Thu, 26 Feb 2026 10:24:28 -0500 Subject: [PATCH] feat: implement sync orchestrator for automated bridge sync - Add SyncOrchestrator that coordinates file watcher events with bridge sync - Debounce rapid file changes (500ms window) - Auto-check if full reconciliation is due (every 6 hours) - Emit status events (Started, Completed, Failed, ReconciliationStarted/Completed) - Support both incremental sync and full reconciliation - 5 tests covering sync, debounce, reconciliation scheduling, and status events Co-Authored-By: Claude Opus 4.5 --- src-tauri/src/sync.rs | 469 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 469 insertions(+) create mode 100644 src-tauri/src/sync.rs diff --git a/src-tauri/src/sync.rs b/src-tauri/src/sync.rs new file mode 100644 index 0000000..9ae2488 --- /dev/null +++ b/src-tauri/src/sync.rs @@ -0,0 +1,469 @@ +//! Sync orchestrator -- coordinates lore.db changes with bridge sync. +//! +//! Responsibilities: +//! - Debounce rapid file changes (500ms) +//! - Auto-trigger incremental sync on file change +//! - Check if reconciliation is due (every 6 hours) +//! - Emit status events to frontend +//! +//! The orchestrator runs as a background task that listens for +//! lore.db change notifications and coordinates the sync flow. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::time::{Duration, Instant}; +use tokio::sync::mpsc; +use tokio::sync::Mutex; + +use crate::data::beads::{BeadsCli, RealBeadsCli}; +use crate::data::bridge::{Bridge, SyncResult}; +use crate::data::lore::{LoreCli, RealLoreCli}; + +/// Debounce window for rapid file changes +const DEBOUNCE_MS: u64 = 500; + +/// Auto-reconciliation interval (6 hours) +const RECONCILE_INTERVAL_SECS: u64 = 6 * 60 * 60; + +/// Events emitted by the sync orchestrator +#[derive(Debug, Clone, serde::Serialize)] +#[serde(tag = "status", rename_all = "snake_case")] +pub enum SyncStatusEvent { + /// Sync started + Started, + /// Sync completed successfully + Completed { + /// Number of items created in this sync + created: u32, + /// Number of items closed (suspect orphans) + closed: u32, + /// Number of items healed (reappeared) + healed: u32, + }, + /// Sync failed + Failed { + /// Error message + message: String, + }, + /// Reconciliation started + ReconciliationStarted, + /// Reconciliation completed + ReconciliationCompleted { + /// Number of items created + created: u32, + /// Number of items closed + closed: u32, + }, +} + +/// The sync orchestrator manages background sync operations. +pub struct SyncOrchestrator { + bridge: Arc>>, + /// Channel to trigger a sync + trigger_tx: mpsc::Sender<()>, + /// Whether the orchestrator is running + running: Arc, + /// Last successful reconciliation time + last_reconciliation: Arc>>, +} + +impl SyncOrchestrator { + /// Create a new orchestrator with real CLI implementations. + pub fn new() -> Self { + Self::with_cli(RealLoreCli, RealBeadsCli) + } +} + +impl Default for SyncOrchestrator { + fn default() -> Self { + Self::new() + } +} + +impl SyncOrchestrator { + /// Create a new orchestrator with custom CLI implementations (for testing). + pub fn with_cli(lore: L, beads: B) -> Self { + let (trigger_tx, _) = mpsc::channel(16); + Self { + bridge: Arc::new(Mutex::new(Bridge::new(lore, beads))), + trigger_tx, + running: Arc::new(AtomicBool::new(false)), + last_reconciliation: Arc::new(Mutex::new(None)), + } + } + + /// Create a new orchestrator with a custom data directory (for testing). + #[cfg(test)] + pub fn with_cli_and_data_dir(lore: L, beads: B, data_dir: std::path::PathBuf) -> Self { + let (trigger_tx, _) = mpsc::channel(16); + Self { + bridge: Arc::new(Mutex::new(Bridge::with_data_dir(lore, beads, data_dir))), + trigger_tx, + running: Arc::new(AtomicBool::new(false)), + last_reconciliation: Arc::new(Mutex::new(None)), + } + } + + /// Start the orchestrator background task. + /// + /// Returns a sender that can be used to trigger syncs. + /// The orchestrator will debounce rapid triggers. + pub fn start(&mut self, on_status: F) -> mpsc::Sender<()> + where + F: Fn(SyncStatusEvent) + Send + Sync + 'static, + { + let (trigger_tx, trigger_rx) = mpsc::channel(16); + self.trigger_tx = trigger_tx.clone(); + self.running.store(true, Ordering::SeqCst); + + let bridge = Arc::clone(&self.bridge); + let running = Arc::clone(&self.running); + let last_reconciliation = Arc::clone(&self.last_reconciliation); + let on_status = Arc::new(on_status); + + tokio::spawn(async move { + run_orchestrator_loop(trigger_rx, bridge, running, last_reconciliation, on_status).await; + }); + + trigger_tx + } + + /// Stop the orchestrator. + pub fn stop(&self) { + self.running.store(false, Ordering::SeqCst); + } + + /// Get a sender to trigger syncs. + pub fn trigger_sender(&self) -> mpsc::Sender<()> { + self.trigger_tx.clone() + } + + /// Run a sync immediately (bypassing debounce). + /// + /// Used by the Tauri command handler for manual sync requests. + pub async fn sync_now(&self) -> Result { + let bridge = self.bridge.lock().await; + + let _lock = bridge.acquire_lock()?; + let mut map = bridge.load_map()?; + + // Recover any pending entries first + let (_recovered, recovery_errors) = bridge.recover_pending(&mut map)?; + + let mut result = bridge.incremental_sync(&mut map)?; + + if !recovery_errors.is_empty() { + result.errors.extend(recovery_errors); + } + + Ok(result) + } + + /// Run a full reconciliation (bypassing debounce). + pub async fn reconcile_now(&self) -> Result { + let bridge = self.bridge.lock().await; + + let _lock = bridge.acquire_lock()?; + let mut map = bridge.load_map()?; + + // Recover any pending entries first + let (_recovered, recovery_errors) = bridge.recover_pending(&mut map)?; + + let mut result = bridge.full_reconciliation(&mut map)?; + + if !recovery_errors.is_empty() { + result.errors.extend(recovery_errors); + } + + // Update last reconciliation time + *self.last_reconciliation.lock().await = Some(Instant::now()); + + Ok(result) + } + + /// Check if reconciliation is due (> 6 hours since last one). + pub async fn is_reconciliation_due(&self) -> bool { + let last = self.last_reconciliation.lock().await; + match *last { + Some(instant) => instant.elapsed() > Duration::from_secs(RECONCILE_INTERVAL_SECS), + None => true, // Never reconciled, so it's due + } + } +} + +/// The main orchestrator loop that handles debouncing and sync. +async fn run_orchestrator_loop( + mut trigger_rx: mpsc::Receiver<()>, + bridge: Arc>>, + running: Arc, + last_reconciliation: Arc>>, + on_status: Arc, +) where + L: LoreCli + Send + 'static, + B: BeadsCli + Send + 'static, + F: Fn(SyncStatusEvent) + Send + Sync + 'static, +{ + let mut last_trigger = Instant::now() - Duration::from_secs(10); // Allow immediate first sync + let debounce_duration = Duration::from_millis(DEBOUNCE_MS); + + while running.load(Ordering::SeqCst) { + // Wait for a trigger with timeout + match tokio::time::timeout(Duration::from_secs(1), trigger_rx.recv()).await { + Ok(Some(())) => { + // Check debounce + if last_trigger.elapsed() < debounce_duration { + tracing::debug!("Sync trigger debounced"); + continue; + } + last_trigger = Instant::now(); + + // Run sync + run_sync(&bridge, &last_reconciliation, &on_status).await; + } + Ok(None) => { + // Channel closed, exit + tracing::info!("Sync trigger channel closed, stopping orchestrator"); + break; + } + Err(_) => { + // Timeout, just loop (allows checking running flag) + } + } + } + + tracing::info!("Sync orchestrator stopped"); +} + +/// Execute a sync operation. +async fn run_sync( + bridge: &Arc>>, + last_reconciliation: &Arc>>, + on_status: &Arc, +) where + L: LoreCli + Send + 'static, + B: BeadsCli + Send + 'static, + F: Fn(SyncStatusEvent) + Send + Sync + 'static, +{ + on_status(SyncStatusEvent::Started); + + let bridge_guard = bridge.lock().await; + + // Acquire lock + let lock = match bridge_guard.acquire_lock() { + Ok(l) => l, + Err(e) => { + on_status(SyncStatusEvent::Failed { + message: format!("Failed to acquire lock: {}", e), + }); + return; + } + }; + + // Load map + let mut map = match bridge_guard.load_map() { + Ok(m) => m, + Err(e) => { + drop(lock); + on_status(SyncStatusEvent::Failed { + message: format!("Failed to load map: {}", e), + }); + return; + } + }; + + // Recover pending + if let Err(e) = bridge_guard.recover_pending(&mut map) { + tracing::warn!("Failed to recover pending entries: {}", e); + } + + // Check if reconciliation is due + let reconcile_due = { + let last = last_reconciliation.lock().await; + match *last { + Some(instant) => instant.elapsed() > Duration::from_secs(RECONCILE_INTERVAL_SECS), + None => true, + } + }; + + if reconcile_due { + on_status(SyncStatusEvent::ReconciliationStarted); + + match bridge_guard.full_reconciliation(&mut map) { + Ok(result) => { + *last_reconciliation.lock().await = Some(Instant::now()); + on_status(SyncStatusEvent::ReconciliationCompleted { + created: result.created, + closed: result.closed, + }); + } + Err(e) => { + on_status(SyncStatusEvent::Failed { + message: format!("Reconciliation failed: {}", e), + }); + return; + } + } + } else { + // Incremental sync + match bridge_guard.incremental_sync(&mut map) { + Ok(result) => { + on_status(SyncStatusEvent::Completed { + created: result.created, + closed: result.closed, + healed: result.healed, + }); + } + Err(e) => { + on_status(SyncStatusEvent::Failed { + message: format!("Sync failed: {}", e), + }); + } + } + } + + drop(lock); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::data::beads::MockBeadsCli; + use crate::data::lore::{LoreMeData, LoreMeResponse, MockLoreCli, SinceLastCheck}; + use std::sync::atomic::AtomicUsize; + use tempfile::TempDir; + + fn mock_empty_response() -> LoreMeResponse { + LoreMeResponse { + ok: true, + data: LoreMeData { + open_issues: vec![], + open_mrs_authored: vec![], + reviewing_mrs: vec![], + activity: vec![], + since_last_check: Some(SinceLastCheck { + cursor_iso: Some("2026-02-26T12:00:00Z".to_string()), + groups: vec![], + total_event_count: 0, + }), + summary: None, + username: None, + since_iso: None, + }, + meta: None, + } + } + + #[tokio::test] + async fn test_sync_now_returns_result() { + let dir = TempDir::new().unwrap(); + + let mut lore = MockLoreCli::new(); + lore.expect_get_me().returning(|| Ok(mock_empty_response())); + + let beads = MockBeadsCli::new(); + + let orch = SyncOrchestrator::with_cli_and_data_dir(lore, beads, dir.path().to_path_buf()); + let result = orch.sync_now().await; + + assert!(result.is_ok()); + let result = result.unwrap(); + assert_eq!(result.created, 0); + } + + #[tokio::test] + async fn test_is_reconciliation_due_when_never_run() { + let lore = MockLoreCli::new(); + let beads = MockBeadsCli::new(); + + let orch = SyncOrchestrator::with_cli(lore, beads); + + assert!(orch.is_reconciliation_due().await); + } + + #[tokio::test] + async fn test_is_reconciliation_not_due_after_recent_run() { + let mut lore = MockLoreCli::new(); + lore.expect_get_me().returning(|| Ok(mock_empty_response())); + + let beads = MockBeadsCli::new(); + + let orch = SyncOrchestrator::with_cli(lore, beads); + + // Simulate recent reconciliation + *orch.last_reconciliation.lock().await = Some(Instant::now()); + + assert!(!orch.is_reconciliation_due().await); + } + + #[tokio::test] + async fn test_orchestrator_debounces_rapid_triggers() { + let trigger_count = Arc::new(AtomicUsize::new(0)); + let trigger_count_clone = Arc::clone(&trigger_count); + + let mut lore = MockLoreCli::new(); + lore.expect_get_me().returning(move || { + trigger_count_clone.fetch_add(1, Ordering::SeqCst); + Ok(mock_empty_response()) + }); + + let beads = MockBeadsCli::new(); + + let mut orch = SyncOrchestrator::with_cli(lore, beads); + + // Start the orchestrator + let trigger_tx = orch.start(|_| {}); + + // Send rapid triggers + for _ in 0..5 { + let _ = trigger_tx.send(()).await; + } + + // Wait for debounce + processing + tokio::time::sleep(Duration::from_millis(100)).await; + + // Should have only triggered once (debounced) + // Note: The actual count may vary due to timing, but should be <= 2 + let count = trigger_count.load(Ordering::SeqCst); + assert!(count <= 2, "Expected debounced triggers, got {}", count); + + orch.stop(); + } + + #[tokio::test] + async fn test_orchestrator_emits_status_events() { + let events = Arc::new(Mutex::new(Vec::new())); + let events_clone = Arc::clone(&events); + + let mut lore = MockLoreCli::new(); + lore.expect_get_me().returning(|| Ok(mock_empty_response())); + + let beads = MockBeadsCli::new(); + + let mut orch = SyncOrchestrator::with_cli(lore, beads); + + // Set last reconciliation to avoid triggering full reconciliation + *orch.last_reconciliation.lock().await = Some(Instant::now()); + + let trigger_tx = orch.start(move |event| { + let events = Arc::clone(&events_clone); + tokio::spawn(async move { + events.lock().await.push(event); + }); + }); + + // Trigger a sync + let _ = trigger_tx.send(()).await; + + // Wait for processing + tokio::time::sleep(Duration::from_millis(200)).await; + + orch.stop(); + + // Check events + let events = events.lock().await; + assert!(!events.is_empty(), "Should have emitted at least one event"); + + // First event should be Started + assert!(matches!(events.first(), Some(SyncStatusEvent::Started))); + } +}