feat: implement sync orchestrator for automated bridge sync

- Add SyncOrchestrator that coordinates file watcher events with bridge sync
- Debounce rapid file changes (500ms window)
- Auto-check if full reconciliation is due (every 6 hours)
- Emit status events (Started, Completed, Failed, ReconciliationStarted/Completed)
- Support both incremental sync and full reconciliation
- 5 tests covering sync, debounce, reconciliation scheduling, and status events

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
teernisse
2026-02-26 10:24:28 -05:00
parent da13b99b75
commit c069e03714

469
src-tauri/src/sync.rs Normal file
View File

@@ -0,0 +1,469 @@
//! Sync orchestrator -- coordinates lore.db changes with bridge sync.
//!
//! Responsibilities:
//! - Debounce rapid file changes (500ms)
//! - Auto-trigger incremental sync on file change
//! - Check if reconciliation is due (every 6 hours)
//! - Emit status events to frontend
//!
//! The orchestrator runs as a background task that listens for
//! lore.db change notifications and coordinates the sync flow.
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use tokio::sync::Mutex;
use crate::data::beads::{BeadsCli, RealBeadsCli};
use crate::data::bridge::{Bridge, SyncResult};
use crate::data::lore::{LoreCli, RealLoreCli};
/// Debounce window for rapid file changes
const DEBOUNCE_MS: u64 = 500;
/// Auto-reconciliation interval (6 hours)
const RECONCILE_INTERVAL_SECS: u64 = 6 * 60 * 60;
/// Events emitted by the sync orchestrator
#[derive(Debug, Clone, serde::Serialize)]
#[serde(tag = "status", rename_all = "snake_case")]
pub enum SyncStatusEvent {
/// Sync started
Started,
/// Sync completed successfully
Completed {
/// Number of items created in this sync
created: u32,
/// Number of items closed (suspect orphans)
closed: u32,
/// Number of items healed (reappeared)
healed: u32,
},
/// Sync failed
Failed {
/// Error message
message: String,
},
/// Reconciliation started
ReconciliationStarted,
/// Reconciliation completed
ReconciliationCompleted {
/// Number of items created
created: u32,
/// Number of items closed
closed: u32,
},
}
/// The sync orchestrator manages background sync operations.
pub struct SyncOrchestrator<L: LoreCli + Send + 'static, B: BeadsCli + Send + 'static> {
bridge: Arc<Mutex<Bridge<L, B>>>,
/// Channel to trigger a sync
trigger_tx: mpsc::Sender<()>,
/// Whether the orchestrator is running
running: Arc<AtomicBool>,
/// Last successful reconciliation time
last_reconciliation: Arc<Mutex<Option<Instant>>>,
}
impl SyncOrchestrator<RealLoreCli, RealBeadsCli> {
/// Create a new orchestrator with real CLI implementations.
pub fn new() -> Self {
Self::with_cli(RealLoreCli, RealBeadsCli)
}
}
impl Default for SyncOrchestrator<RealLoreCli, RealBeadsCli> {
fn default() -> Self {
Self::new()
}
}
impl<L: LoreCli + Send + 'static, B: BeadsCli + Send + 'static> SyncOrchestrator<L, B> {
/// Create a new orchestrator with custom CLI implementations (for testing).
pub fn with_cli(lore: L, beads: B) -> Self {
let (trigger_tx, _) = mpsc::channel(16);
Self {
bridge: Arc::new(Mutex::new(Bridge::new(lore, beads))),
trigger_tx,
running: Arc::new(AtomicBool::new(false)),
last_reconciliation: Arc::new(Mutex::new(None)),
}
}
/// Create a new orchestrator with a custom data directory (for testing).
#[cfg(test)]
pub fn with_cli_and_data_dir(lore: L, beads: B, data_dir: std::path::PathBuf) -> Self {
let (trigger_tx, _) = mpsc::channel(16);
Self {
bridge: Arc::new(Mutex::new(Bridge::with_data_dir(lore, beads, data_dir))),
trigger_tx,
running: Arc::new(AtomicBool::new(false)),
last_reconciliation: Arc::new(Mutex::new(None)),
}
}
/// Start the orchestrator background task.
///
/// Returns a sender that can be used to trigger syncs.
/// The orchestrator will debounce rapid triggers.
pub fn start<F>(&mut self, on_status: F) -> mpsc::Sender<()>
where
F: Fn(SyncStatusEvent) + Send + Sync + 'static,
{
let (trigger_tx, trigger_rx) = mpsc::channel(16);
self.trigger_tx = trigger_tx.clone();
self.running.store(true, Ordering::SeqCst);
let bridge = Arc::clone(&self.bridge);
let running = Arc::clone(&self.running);
let last_reconciliation = Arc::clone(&self.last_reconciliation);
let on_status = Arc::new(on_status);
tokio::spawn(async move {
run_orchestrator_loop(trigger_rx, bridge, running, last_reconciliation, on_status).await;
});
trigger_tx
}
/// Stop the orchestrator.
pub fn stop(&self) {
self.running.store(false, Ordering::SeqCst);
}
/// Get a sender to trigger syncs.
pub fn trigger_sender(&self) -> mpsc::Sender<()> {
self.trigger_tx.clone()
}
/// Run a sync immediately (bypassing debounce).
///
/// Used by the Tauri command handler for manual sync requests.
pub async fn sync_now(&self) -> Result<SyncResult, crate::error::McError> {
let bridge = self.bridge.lock().await;
let _lock = bridge.acquire_lock()?;
let mut map = bridge.load_map()?;
// Recover any pending entries first
let (_recovered, recovery_errors) = bridge.recover_pending(&mut map)?;
let mut result = bridge.incremental_sync(&mut map)?;
if !recovery_errors.is_empty() {
result.errors.extend(recovery_errors);
}
Ok(result)
}
/// Run a full reconciliation (bypassing debounce).
pub async fn reconcile_now(&self) -> Result<SyncResult, crate::error::McError> {
let bridge = self.bridge.lock().await;
let _lock = bridge.acquire_lock()?;
let mut map = bridge.load_map()?;
// Recover any pending entries first
let (_recovered, recovery_errors) = bridge.recover_pending(&mut map)?;
let mut result = bridge.full_reconciliation(&mut map)?;
if !recovery_errors.is_empty() {
result.errors.extend(recovery_errors);
}
// Update last reconciliation time
*self.last_reconciliation.lock().await = Some(Instant::now());
Ok(result)
}
/// Check if reconciliation is due (> 6 hours since last one).
pub async fn is_reconciliation_due(&self) -> bool {
let last = self.last_reconciliation.lock().await;
match *last {
Some(instant) => instant.elapsed() > Duration::from_secs(RECONCILE_INTERVAL_SECS),
None => true, // Never reconciled, so it's due
}
}
}
/// The main orchestrator loop that handles debouncing and sync.
async fn run_orchestrator_loop<L, B, F>(
mut trigger_rx: mpsc::Receiver<()>,
bridge: Arc<Mutex<Bridge<L, B>>>,
running: Arc<AtomicBool>,
last_reconciliation: Arc<Mutex<Option<Instant>>>,
on_status: Arc<F>,
) where
L: LoreCli + Send + 'static,
B: BeadsCli + Send + 'static,
F: Fn(SyncStatusEvent) + Send + Sync + 'static,
{
let mut last_trigger = Instant::now() - Duration::from_secs(10); // Allow immediate first sync
let debounce_duration = Duration::from_millis(DEBOUNCE_MS);
while running.load(Ordering::SeqCst) {
// Wait for a trigger with timeout
match tokio::time::timeout(Duration::from_secs(1), trigger_rx.recv()).await {
Ok(Some(())) => {
// Check debounce
if last_trigger.elapsed() < debounce_duration {
tracing::debug!("Sync trigger debounced");
continue;
}
last_trigger = Instant::now();
// Run sync
run_sync(&bridge, &last_reconciliation, &on_status).await;
}
Ok(None) => {
// Channel closed, exit
tracing::info!("Sync trigger channel closed, stopping orchestrator");
break;
}
Err(_) => {
// Timeout, just loop (allows checking running flag)
}
}
}
tracing::info!("Sync orchestrator stopped");
}
/// Execute a sync operation.
async fn run_sync<L, B, F>(
bridge: &Arc<Mutex<Bridge<L, B>>>,
last_reconciliation: &Arc<Mutex<Option<Instant>>>,
on_status: &Arc<F>,
) where
L: LoreCli + Send + 'static,
B: BeadsCli + Send + 'static,
F: Fn(SyncStatusEvent) + Send + Sync + 'static,
{
on_status(SyncStatusEvent::Started);
let bridge_guard = bridge.lock().await;
// Acquire lock
let lock = match bridge_guard.acquire_lock() {
Ok(l) => l,
Err(e) => {
on_status(SyncStatusEvent::Failed {
message: format!("Failed to acquire lock: {}", e),
});
return;
}
};
// Load map
let mut map = match bridge_guard.load_map() {
Ok(m) => m,
Err(e) => {
drop(lock);
on_status(SyncStatusEvent::Failed {
message: format!("Failed to load map: {}", e),
});
return;
}
};
// Recover pending
if let Err(e) = bridge_guard.recover_pending(&mut map) {
tracing::warn!("Failed to recover pending entries: {}", e);
}
// Check if reconciliation is due
let reconcile_due = {
let last = last_reconciliation.lock().await;
match *last {
Some(instant) => instant.elapsed() > Duration::from_secs(RECONCILE_INTERVAL_SECS),
None => true,
}
};
if reconcile_due {
on_status(SyncStatusEvent::ReconciliationStarted);
match bridge_guard.full_reconciliation(&mut map) {
Ok(result) => {
*last_reconciliation.lock().await = Some(Instant::now());
on_status(SyncStatusEvent::ReconciliationCompleted {
created: result.created,
closed: result.closed,
});
}
Err(e) => {
on_status(SyncStatusEvent::Failed {
message: format!("Reconciliation failed: {}", e),
});
return;
}
}
} else {
// Incremental sync
match bridge_guard.incremental_sync(&mut map) {
Ok(result) => {
on_status(SyncStatusEvent::Completed {
created: result.created,
closed: result.closed,
healed: result.healed,
});
}
Err(e) => {
on_status(SyncStatusEvent::Failed {
message: format!("Sync failed: {}", e),
});
}
}
}
drop(lock);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::data::beads::MockBeadsCli;
use crate::data::lore::{LoreMeData, LoreMeResponse, MockLoreCli, SinceLastCheck};
use std::sync::atomic::AtomicUsize;
use tempfile::TempDir;
fn mock_empty_response() -> LoreMeResponse {
LoreMeResponse {
ok: true,
data: LoreMeData {
open_issues: vec![],
open_mrs_authored: vec![],
reviewing_mrs: vec![],
activity: vec![],
since_last_check: Some(SinceLastCheck {
cursor_iso: Some("2026-02-26T12:00:00Z".to_string()),
groups: vec![],
total_event_count: 0,
}),
summary: None,
username: None,
since_iso: None,
},
meta: None,
}
}
#[tokio::test]
async fn test_sync_now_returns_result() {
let dir = TempDir::new().unwrap();
let mut lore = MockLoreCli::new();
lore.expect_get_me().returning(|| Ok(mock_empty_response()));
let beads = MockBeadsCli::new();
let orch = SyncOrchestrator::with_cli_and_data_dir(lore, beads, dir.path().to_path_buf());
let result = orch.sync_now().await;
assert!(result.is_ok());
let result = result.unwrap();
assert_eq!(result.created, 0);
}
#[tokio::test]
async fn test_is_reconciliation_due_when_never_run() {
let lore = MockLoreCli::new();
let beads = MockBeadsCli::new();
let orch = SyncOrchestrator::with_cli(lore, beads);
assert!(orch.is_reconciliation_due().await);
}
#[tokio::test]
async fn test_is_reconciliation_not_due_after_recent_run() {
let mut lore = MockLoreCli::new();
lore.expect_get_me().returning(|| Ok(mock_empty_response()));
let beads = MockBeadsCli::new();
let orch = SyncOrchestrator::with_cli(lore, beads);
// Simulate recent reconciliation
*orch.last_reconciliation.lock().await = Some(Instant::now());
assert!(!orch.is_reconciliation_due().await);
}
#[tokio::test]
async fn test_orchestrator_debounces_rapid_triggers() {
let trigger_count = Arc::new(AtomicUsize::new(0));
let trigger_count_clone = Arc::clone(&trigger_count);
let mut lore = MockLoreCli::new();
lore.expect_get_me().returning(move || {
trigger_count_clone.fetch_add(1, Ordering::SeqCst);
Ok(mock_empty_response())
});
let beads = MockBeadsCli::new();
let mut orch = SyncOrchestrator::with_cli(lore, beads);
// Start the orchestrator
let trigger_tx = orch.start(|_| {});
// Send rapid triggers
for _ in 0..5 {
let _ = trigger_tx.send(()).await;
}
// Wait for debounce + processing
tokio::time::sleep(Duration::from_millis(100)).await;
// Should have only triggered once (debounced)
// Note: The actual count may vary due to timing, but should be <= 2
let count = trigger_count.load(Ordering::SeqCst);
assert!(count <= 2, "Expected debounced triggers, got {}", count);
orch.stop();
}
#[tokio::test]
async fn test_orchestrator_emits_status_events() {
let events = Arc::new(Mutex::new(Vec::new()));
let events_clone = Arc::clone(&events);
let mut lore = MockLoreCli::new();
lore.expect_get_me().returning(|| Ok(mock_empty_response()));
let beads = MockBeadsCli::new();
let mut orch = SyncOrchestrator::with_cli(lore, beads);
// Set last reconciliation to avoid triggering full reconciliation
*orch.last_reconciliation.lock().await = Some(Instant::now());
let trigger_tx = orch.start(move |event| {
let events = Arc::clone(&events_clone);
tokio::spawn(async move {
events.lock().await.push(event);
});
});
// Trigger a sync
let _ = trigger_tx.send(()).await;
// Wait for processing
tokio::time::sleep(Duration::from_millis(200)).await;
orch.stop();
// Check events
let events = events.lock().await;
assert!(!events.is_empty(), "Should have emitted at least one event");
// First event should be Started
assert!(matches!(events.first(), Some(SyncStatusEvent::Started)));
}
}