feat: TUI Phase 1 common widgets + scoring/path beads
bd-26f2: Common widgets (render_breadcrumb, render_status_bar, render_loading, render_error_toast, render_help_overlay) + render_screen top-level dispatch wired to LoreApp::view(). 27 widget tests. bd-2w1p: Add half-life fields to ScoringConfig with validation. bd-1soz: Add half_life_decay() pure function. bd-18dn: Add normalize_query_path() for path canonicalization. Phase 1 modules: CommandRegistry, NavigationStack, CrashContext, TaskSupervisor, AppState with per-screen states. 172 lore-tui tests passing, clippy clean, fmt clean.
This commit is contained in:
380
crates/lore-tui/src/task_supervisor.rs
Normal file
380
crates/lore-tui/src/task_supervisor.rs
Normal file
@@ -0,0 +1,380 @@
|
||||
#![allow(dead_code)] // Phase 1: consumed by LoreApp in bd-6pmy
|
||||
|
||||
//! Centralized background task management with dedup and cancellation.
|
||||
//!
|
||||
//! All background work (DB queries, sync, search) flows through
|
||||
//! [`TaskSupervisor`]. Submitting a task with a key that already has an
|
||||
//! active handle cancels the previous task via its [`CancelToken`] and
|
||||
//! bumps the generation counter.
|
||||
//!
|
||||
//! Generation IDs enable stale-result detection: when an async result
|
||||
//! arrives, [`is_current`] checks whether the result's generation
|
||||
//! matches the latest submission for that key.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
||||
|
||||
use crate::message::Screen;
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// TaskKey
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Deduplication key for background tasks.
|
||||
///
|
||||
/// Two tasks with the same key cannot run concurrently — submitting a
|
||||
/// new task with an existing key cancels the previous one.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
pub enum TaskKey {
|
||||
/// Load data for a specific screen.
|
||||
LoadScreen(Screen),
|
||||
/// Global search query.
|
||||
Search,
|
||||
/// Sync stream (only one at a time).
|
||||
SyncStream,
|
||||
/// Re-query after filter change on a specific screen.
|
||||
FilterRequery(Screen),
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// TaskPriority
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Priority levels for task scheduling.
|
||||
///
|
||||
/// Lower numeric value = higher priority.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
pub enum TaskPriority {
|
||||
/// User-initiated input (highest priority).
|
||||
Input = 0,
|
||||
/// Navigation-triggered data load.
|
||||
Navigation = 1,
|
||||
/// Background refresh / prefetch (lowest priority).
|
||||
Background = 2,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// CancelToken
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Thread-safe cooperative cancellation flag.
|
||||
///
|
||||
/// Background tasks poll [`is_cancelled`] periodically and exit early
|
||||
/// when it returns `true`.
|
||||
#[derive(Debug)]
|
||||
pub struct CancelToken {
|
||||
cancelled: AtomicBool,
|
||||
}
|
||||
|
||||
impl CancelToken {
|
||||
/// Create a new, non-cancelled token.
|
||||
#[must_use]
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
cancelled: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
|
||||
/// Signal cancellation.
|
||||
pub fn cancel(&self) {
|
||||
self.cancelled.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
/// Check whether cancellation has been requested.
|
||||
#[must_use]
|
||||
pub fn is_cancelled(&self) -> bool {
|
||||
self.cancelled.load(Ordering::Relaxed)
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CancelToken {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// InterruptHandle
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Opaque handle for interrupting a rusqlite operation.
|
||||
///
|
||||
/// Wraps the rusqlite `InterruptHandle` so the supervisor can cancel
|
||||
/// long-running queries. This is only set for tasks that lease a reader
|
||||
/// connection from [`DbManager`](crate::db::DbManager).
|
||||
pub struct InterruptHandle {
|
||||
handle: rusqlite::InterruptHandle,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for InterruptHandle {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("InterruptHandle").finish_non_exhaustive()
|
||||
}
|
||||
}
|
||||
|
||||
impl InterruptHandle {
|
||||
/// Wrap a rusqlite interrupt handle.
|
||||
#[must_use]
|
||||
pub fn new(handle: rusqlite::InterruptHandle) -> Self {
|
||||
Self { handle }
|
||||
}
|
||||
|
||||
/// Interrupt the associated SQLite operation.
|
||||
pub fn interrupt(&self) {
|
||||
self.handle.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// TaskHandle
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Handle returned when a task is submitted.
|
||||
///
|
||||
/// Callers use this to pass the generation ID into async work so
|
||||
/// results can be tagged and checked for staleness.
|
||||
#[derive(Debug)]
|
||||
pub struct TaskHandle {
|
||||
/// Dedup key for this task.
|
||||
pub key: TaskKey,
|
||||
/// Monotonically increasing generation for stale detection.
|
||||
pub generation: u64,
|
||||
/// Cooperative cancellation token (shared with the supervisor).
|
||||
pub cancel: Arc<CancelToken>,
|
||||
/// Optional SQLite interrupt handle for long queries.
|
||||
pub interrupt: Option<InterruptHandle>,
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// TaskSupervisor
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// Manages background tasks with deduplication and cancellation.
|
||||
///
|
||||
/// Only one task per [`TaskKey`] can be active. Submitting a new task
|
||||
/// with an existing key cancels the previous one (via its cancel token
|
||||
/// and optional interrupt handle) before registering the new handle.
|
||||
pub struct TaskSupervisor {
|
||||
active: HashMap<TaskKey, TaskHandle>,
|
||||
next_generation: AtomicU64,
|
||||
}
|
||||
|
||||
impl TaskSupervisor {
|
||||
/// Create a new supervisor with no active tasks.
|
||||
#[must_use]
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
active: HashMap::new(),
|
||||
next_generation: AtomicU64::new(1),
|
||||
}
|
||||
}
|
||||
|
||||
/// Submit a new task, cancelling any existing task with the same key.
|
||||
///
|
||||
/// Returns a [`TaskHandle`] with a fresh generation ID and a shared
|
||||
/// cancel token. The caller clones the `Arc<CancelToken>` and passes
|
||||
/// it into the async work.
|
||||
pub fn submit(&mut self, key: TaskKey) -> &TaskHandle {
|
||||
// Cancel existing task with this key, if any.
|
||||
if let Some(old) = self.active.remove(&key) {
|
||||
old.cancel.cancel();
|
||||
if let Some(interrupt) = &old.interrupt {
|
||||
interrupt.interrupt();
|
||||
}
|
||||
}
|
||||
|
||||
let generation = self.next_generation.fetch_add(1, Ordering::Relaxed);
|
||||
let cancel = Arc::new(CancelToken::new());
|
||||
|
||||
let handle = TaskHandle {
|
||||
key: key.clone(),
|
||||
generation,
|
||||
cancel,
|
||||
interrupt: None,
|
||||
};
|
||||
|
||||
self.active.insert(key.clone(), handle);
|
||||
self.active.get(&key).expect("just inserted")
|
||||
}
|
||||
|
||||
/// Check whether a generation is current for a given key.
|
||||
///
|
||||
/// Returns `true` only if the key has an active handle with the
|
||||
/// specified generation.
|
||||
#[must_use]
|
||||
pub fn is_current(&self, key: &TaskKey, generation: u64) -> bool {
|
||||
self.active
|
||||
.get(key)
|
||||
.is_some_and(|h| h.generation == generation)
|
||||
}
|
||||
|
||||
/// Mark a task as complete, removing its handle.
|
||||
///
|
||||
/// Only removes the handle if the generation matches the active one.
|
||||
/// This prevents a late-arriving completion from removing a newer
|
||||
/// task's handle.
|
||||
pub fn complete(&mut self, key: &TaskKey, generation: u64) {
|
||||
if self.is_current(key, generation) {
|
||||
self.active.remove(key);
|
||||
}
|
||||
}
|
||||
|
||||
/// Cancel all active tasks.
|
||||
///
|
||||
/// Used during shutdown to ensure background work stops promptly.
|
||||
pub fn cancel_all(&mut self) {
|
||||
for (_, handle) in self.active.drain() {
|
||||
handle.cancel.cancel();
|
||||
if let Some(interrupt) = &handle.interrupt {
|
||||
interrupt.interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Number of currently active tasks.
|
||||
#[must_use]
|
||||
pub fn active_count(&self) -> usize {
|
||||
self.active.len()
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for TaskSupervisor {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Tests
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_submit_cancels_previous() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
|
||||
let gen1 = sup.submit(TaskKey::Search).generation;
|
||||
let cancel1 = sup.active.get(&TaskKey::Search).unwrap().cancel.clone();
|
||||
|
||||
let gen2 = sup.submit(TaskKey::Search).generation;
|
||||
|
||||
// First task's token should be cancelled.
|
||||
assert!(cancel1.is_cancelled());
|
||||
// Second task should have a different (higher) generation.
|
||||
assert!(gen2 > gen1);
|
||||
// Only one active task for this key.
|
||||
assert_eq!(sup.active_count(), 1);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_is_current_after_supersede() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
|
||||
let gen1 = sup.submit(TaskKey::Search).generation;
|
||||
let gen2 = sup.submit(TaskKey::Search).generation;
|
||||
|
||||
assert!(!sup.is_current(&TaskKey::Search, gen1));
|
||||
assert!(sup.is_current(&TaskKey::Search, gen2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_complete_removes_handle() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
let generation = sup.submit(TaskKey::Search).generation;
|
||||
|
||||
assert_eq!(sup.active_count(), 1);
|
||||
sup.complete(&TaskKey::Search, generation);
|
||||
assert_eq!(sup.active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_complete_ignores_stale() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
|
||||
let gen1 = sup.submit(TaskKey::Search).generation;
|
||||
let gen2 = sup.submit(TaskKey::Search).generation;
|
||||
|
||||
// Completing with old generation should NOT remove the newer handle.
|
||||
sup.complete(&TaskKey::Search, gen1);
|
||||
assert_eq!(sup.active_count(), 1);
|
||||
assert!(sup.is_current(&TaskKey::Search, gen2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_generation_monotonic() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
|
||||
let g1 = sup.submit(TaskKey::Search).generation;
|
||||
let g2 = sup.submit(TaskKey::SyncStream).generation;
|
||||
let g3 = sup.submit(TaskKey::Search).generation;
|
||||
|
||||
assert!(g1 < g2);
|
||||
assert!(g2 < g3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_different_keys_coexist() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
|
||||
sup.submit(TaskKey::Search);
|
||||
sup.submit(TaskKey::SyncStream);
|
||||
sup.submit(TaskKey::LoadScreen(Screen::Dashboard));
|
||||
|
||||
assert_eq!(sup.active_count(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cancel_all() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
|
||||
let cancel_search = {
|
||||
sup.submit(TaskKey::Search);
|
||||
sup.active.get(&TaskKey::Search).unwrap().cancel.clone()
|
||||
};
|
||||
let cancel_sync = {
|
||||
sup.submit(TaskKey::SyncStream);
|
||||
sup.active.get(&TaskKey::SyncStream).unwrap().cancel.clone()
|
||||
};
|
||||
|
||||
sup.cancel_all();
|
||||
|
||||
assert!(cancel_search.is_cancelled());
|
||||
assert!(cancel_sync.is_cancelled());
|
||||
assert_eq!(sup.active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cancel_token_default_is_not_cancelled() {
|
||||
let token = CancelToken::new();
|
||||
assert!(!token.is_cancelled());
|
||||
token.cancel();
|
||||
assert!(token.is_cancelled());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_cancel_token_is_send_sync() {
|
||||
fn assert_send_sync<T: Send + Sync>() {}
|
||||
assert_send_sync::<CancelToken>();
|
||||
assert_send_sync::<Arc<CancelToken>>();
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_task_supervisor_default() {
|
||||
let sup = TaskSupervisor::default();
|
||||
assert_eq!(sup.active_count(), 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_filter_requery_key_distinct_per_screen() {
|
||||
let mut sup = TaskSupervisor::new();
|
||||
|
||||
sup.submit(TaskKey::FilterRequery(Screen::IssueList));
|
||||
sup.submit(TaskKey::FilterRequery(Screen::MrList));
|
||||
|
||||
assert_eq!(sup.active_count(), 2);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user