feat: implement GitLab -> Beads bridge with crash-safe syncing
Adds the core sync engine that maps GitLab events (from lore CLI) to beads tasks. This is the foundational data layer for Mission Control's unified task view. Bridge architecture: - GitLabBeadMap: JSON file storing event -> bead_id mappings - MappingKey: Type-safe keys for MR reviews, issues, and authored MRs - Cursor: Tracks last sync and reconciliation timestamps Crash-safety features: - Write-ahead pattern: pending=true written before bead creation - Atomic file writes via temp file + rename - Recovery on startup: retries pending entries with bead_id=None - flock(2) based single-instance locking (prevents concurrent MC) Two-strike orphan detection: - First miss sets suspect_orphan=true (items may temporarily vanish) - Second miss closes the bead (confirmed deleted/merged) - Reappearance clears the flag (healed) Sync operations: - incremental_sync(): Process since_last_check events - full_reconciliation(): Cross-check all open items - recover_pending(): Handle interrupted syncs Dependencies added: - libc: For flock(2) system call - thiserror: For ergonomic error types Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
1299
src-tauri/src/data/bridge.rs
Normal file
1299
src-tauri/src/data/bridge.rs
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,13 +1,13 @@
|
||||
//! Mission Control local state management
|
||||
//!
|
||||
//! Handles persistence for:
|
||||
//! - GitLab → Bead mapping (deduplication)
|
||||
//! - Decision log (learning from user choices)
|
||||
//! - Application state (current focus, queue order)
|
||||
//! - User settings
|
||||
//!
|
||||
//! Note: GitLab → Bead mapping is handled by the bridge module.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fs;
|
||||
use std::io::{self, BufRead, Write};
|
||||
use std::path::PathBuf;
|
||||
@@ -19,62 +19,6 @@ pub fn mc_data_dir() -> PathBuf {
|
||||
.join("mc")
|
||||
}
|
||||
|
||||
/// GitLab event to Bead ID mapping
|
||||
///
|
||||
/// Used for deduplication - ensures we don't create multiple beads for the same GitLab event.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct GitLabBeadMap {
|
||||
/// Map from event key (e.g., "mr_review:host:123:456") to bead ID
|
||||
pub mappings: HashMap<String, MappedBead>,
|
||||
}
|
||||
|
||||
/// A mapped GitLab event with metadata
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct MappedBead {
|
||||
pub bead_id: String,
|
||||
pub created_at: String,
|
||||
/// Number of consecutive reconciliations where this item was missing from lore
|
||||
/// Used for two-strike auto-close rule
|
||||
pub miss_count: u32,
|
||||
/// Whether this item is suspected orphan (first miss occurred)
|
||||
pub suspect_orphan: bool,
|
||||
}
|
||||
|
||||
impl GitLabBeadMap {
|
||||
/// Load mapping from disk, or create empty if not exists
|
||||
pub fn load() -> io::Result<Self> {
|
||||
let path = mc_data_dir().join("gitlab_bead_map.json");
|
||||
|
||||
if path.exists() {
|
||||
let content = fs::read_to_string(&path)?;
|
||||
serde_json::from_str(&content)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
||||
} else {
|
||||
Ok(Self {
|
||||
mappings: HashMap::new(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Save mapping to disk with atomic write
|
||||
pub fn save(&self) -> io::Result<()> {
|
||||
let dir = mc_data_dir();
|
||||
fs::create_dir_all(&dir)?;
|
||||
|
||||
let path = dir.join("gitlab_bead_map.json");
|
||||
let tmp_path = dir.join("gitlab_bead_map.json.tmp");
|
||||
|
||||
let content = serde_json::to_string_pretty(self)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
// Atomic write: write to tmp, then rename
|
||||
fs::write(&tmp_path, content)?;
|
||||
fs::rename(&tmp_path, &path)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A logged decision for learning
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct Decision {
|
||||
@@ -119,19 +63,35 @@ impl DecisionLog {
|
||||
fs::create_dir_all(&dir)?;
|
||||
|
||||
let path = dir.join("decision_log.jsonl");
|
||||
|
||||
// Use explicit 0600 permissions on Unix -- decision logs contain user data
|
||||
#[cfg(unix)]
|
||||
let mut file = {
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.mode(0o600)
|
||||
.open(&path)?
|
||||
};
|
||||
#[cfg(not(unix))]
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)?;
|
||||
.open(&path)?;
|
||||
|
||||
let line = serde_json::to_string(decision)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
writeln!(file, "{}", line)?;
|
||||
file.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Read all decisions from the log
|
||||
///
|
||||
/// Skips corrupted lines with a warning rather than failing entirely.
|
||||
/// This makes the log resilient to partial writes or corruption.
|
||||
pub fn read_all() -> io::Result<Vec<Decision>> {
|
||||
let path = mc_data_dir().join("decision_log.jsonl");
|
||||
|
||||
@@ -143,14 +103,21 @@ impl DecisionLog {
|
||||
let reader = io::BufReader::new(file);
|
||||
|
||||
let mut decisions = vec![];
|
||||
for line in reader.lines() {
|
||||
for (line_num, line) in reader.lines().enumerate() {
|
||||
let line = line?;
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
let decision: Decision = serde_json::from_str(&line)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
decisions.push(decision);
|
||||
match serde_json::from_str::<Decision>(&line) {
|
||||
Ok(decision) => decisions.push(decision),
|
||||
Err(e) => {
|
||||
tracing::warn!(
|
||||
"Skipping corrupted decision log entry at line {}: {}",
|
||||
line_num + 1,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(decisions)
|
||||
@@ -161,24 +128,201 @@ impl DecisionLog {
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
fn sample_decision() -> Decision {
|
||||
Decision {
|
||||
timestamp: "2026-02-26T12:00:00Z".to_string(),
|
||||
action: DecisionAction::SetFocus,
|
||||
bead_id: "bd-abc".to_string(),
|
||||
reason: Some("High priority".to_string()),
|
||||
tags: vec!["urgent".to_string()],
|
||||
context: DecisionContext {
|
||||
time_of_day: "morning".to_string(),
|
||||
day_of_week: "Thursday".to_string(),
|
||||
queue_size: 5,
|
||||
inbox_size: 3,
|
||||
bead_age_hours: Some(2.5),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_mapping_roundtrip() {
|
||||
let map = GitLabBeadMap {
|
||||
mappings: HashMap::from([(
|
||||
"mr_review:gitlab.com:123:456".to_string(),
|
||||
MappedBead {
|
||||
bead_id: "bd-abc".to_string(),
|
||||
created_at: "2026-02-25T12:00:00Z".to_string(),
|
||||
miss_count: 0,
|
||||
suspect_orphan: false,
|
||||
},
|
||||
)]),
|
||||
};
|
||||
fn test_decision_serialization() {
|
||||
let decision = sample_decision();
|
||||
|
||||
let json = serde_json::to_string_pretty(&map).unwrap();
|
||||
let parsed: GitLabBeadMap = serde_json::from_str(&json).unwrap();
|
||||
let json = serde_json::to_string(&decision).unwrap();
|
||||
let parsed: Decision = serde_json::from_str(&json).unwrap();
|
||||
|
||||
assert_eq!(parsed.mappings.len(), 1);
|
||||
assert!(parsed.mappings.contains_key("mr_review:gitlab.com:123:456"));
|
||||
assert_eq!(parsed.bead_id, "bd-abc");
|
||||
assert_eq!(parsed.context.queue_size, 5);
|
||||
}
|
||||
|
||||
/// Tests for DecisionLog file operations require a custom data dir.
|
||||
/// We test the core logic via a helper that takes a path.
|
||||
mod file_io_tests {
|
||||
use super::*;
|
||||
use std::io::Write;
|
||||
use tempfile::TempDir;
|
||||
|
||||
fn append_to_path(path: &std::path::Path, decision: &Decision) -> io::Result<()> {
|
||||
let parent = path.parent().unwrap();
|
||||
fs::create_dir_all(parent)?;
|
||||
|
||||
#[cfg(unix)]
|
||||
let mut file = {
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.mode(0o600)
|
||||
.open(path)?
|
||||
};
|
||||
#[cfg(not(unix))]
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)?;
|
||||
|
||||
let line = serde_json::to_string(decision)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
|
||||
|
||||
writeln!(file, "{}", line)?;
|
||||
file.sync_all()?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_all_from_path(path: &std::path::Path) -> io::Result<Vec<Decision>> {
|
||||
if !path.exists() {
|
||||
return Ok(vec![]);
|
||||
}
|
||||
|
||||
let file = fs::File::open(path)?;
|
||||
let reader = io::BufReader::new(file);
|
||||
|
||||
let mut decisions = vec![];
|
||||
for line in reader.lines() {
|
||||
let line = line?;
|
||||
if line.trim().is_empty() {
|
||||
continue;
|
||||
}
|
||||
if let Ok(decision) = serde_json::from_str::<Decision>(&line) {
|
||||
decisions.push(decision);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(decisions)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_and_read_single_decision() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("decision_log.jsonl");
|
||||
|
||||
let decision = sample_decision();
|
||||
append_to_path(&path, &decision).unwrap();
|
||||
|
||||
let decisions = read_all_from_path(&path).unwrap();
|
||||
assert_eq!(decisions.len(), 1);
|
||||
assert_eq!(decisions[0].bead_id, "bd-abc");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_append_multiple_decisions() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("decision_log.jsonl");
|
||||
|
||||
let mut d1 = sample_decision();
|
||||
d1.bead_id = "bd-001".to_string();
|
||||
append_to_path(&path, &d1).unwrap();
|
||||
|
||||
let mut d2 = sample_decision();
|
||||
d2.bead_id = "bd-002".to_string();
|
||||
d2.action = DecisionAction::Complete;
|
||||
append_to_path(&path, &d2).unwrap();
|
||||
|
||||
let decisions = read_all_from_path(&path).unwrap();
|
||||
assert_eq!(decisions.len(), 2);
|
||||
assert_eq!(decisions[0].bead_id, "bd-001");
|
||||
assert_eq!(decisions[1].bead_id, "bd-002");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_empty_file_returns_empty_vec() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("decision_log.jsonl");
|
||||
|
||||
// Create empty file
|
||||
fs::File::create(&path).unwrap();
|
||||
|
||||
let decisions = read_all_from_path(&path).unwrap();
|
||||
assert!(decisions.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_nonexistent_file_returns_empty_vec() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("nonexistent.jsonl");
|
||||
|
||||
let decisions = read_all_from_path(&path).unwrap();
|
||||
assert!(decisions.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_skips_corrupted_lines() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("decision_log.jsonl");
|
||||
|
||||
// Write a valid decision
|
||||
let decision = sample_decision();
|
||||
append_to_path(&path, &decision).unwrap();
|
||||
|
||||
// Append a corrupted line manually
|
||||
let mut file = fs::OpenOptions::new().append(true).open(&path).unwrap();
|
||||
writeln!(file, "{{\"invalid\": json}}").unwrap();
|
||||
writeln!(file, "not json at all").unwrap();
|
||||
|
||||
// Write another valid decision
|
||||
let mut d2 = sample_decision();
|
||||
d2.bead_id = "bd-after-corruption".to_string();
|
||||
append_to_path(&path, &d2).unwrap();
|
||||
|
||||
// Should read 2 valid decisions, skipping the 2 corrupted lines
|
||||
let decisions = read_all_from_path(&path).unwrap();
|
||||
assert_eq!(decisions.len(), 2);
|
||||
assert_eq!(decisions[0].bead_id, "bd-abc");
|
||||
assert_eq!(decisions[1].bead_id, "bd-after-corruption");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_read_skips_empty_lines() {
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("decision_log.jsonl");
|
||||
|
||||
let decision = sample_decision();
|
||||
append_to_path(&path, &decision).unwrap();
|
||||
|
||||
// Append empty lines
|
||||
let mut file = fs::OpenOptions::new().append(true).open(&path).unwrap();
|
||||
writeln!(file, "").unwrap();
|
||||
writeln!(file, " ").unwrap();
|
||||
|
||||
let decisions = read_all_from_path(&path).unwrap();
|
||||
assert_eq!(decisions.len(), 1);
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
#[test]
|
||||
fn test_file_has_secure_permissions() {
|
||||
use std::os::unix::fs::MetadataExt;
|
||||
|
||||
let dir = TempDir::new().unwrap();
|
||||
let path = dir.path().join("decision_log.jsonl");
|
||||
|
||||
let decision = sample_decision();
|
||||
append_to_path(&path, &decision).unwrap();
|
||||
|
||||
let metadata = fs::metadata(&path).unwrap();
|
||||
let mode = metadata.mode() & 0o777;
|
||||
assert_eq!(mode, 0o600, "File should have 0600 permissions");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user