9 Commits

Author SHA1 Message Date
Taylor Eernisse
5c521491b7 chore(beads): Update issue tracker state for Gate 1 completions
Closes bd-hu3, bd-2e8, bd-2fm, bd-sqw, bd-1uc, bd-tir, bd-3sh, bd-1m8.
All Gate 1 resource events infrastructure beads except bd-1ep (pipeline
wiring) are now complete.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:08:23 -05:00
Taylor Eernisse
0236ef2776 feat(stats): Extend --check with event FK integrity and queue health diagnostics
Adds two new categories of integrity checks to 'lore stats --check':

Event FK integrity (3 queries):
- Detects orphaned resource_state_events where issue_id or
  merge_request_id points to a non-existent parent entity
- Same check for resource_label_events and resource_milestone_events
- Under normal CASCADE operation these should always be zero; non-zero
  indicates manual DB edits, bugs, or partial migration state

Queue health diagnostics:
- pending_dependent_fetches counts: pending, failed, and stuck (locked)
- queue_stuck_locks: Jobs with locked_at set (potential worker crashes)
- queue_max_attempts: Highest retry count across all jobs (signals
  permanently failing jobs when > 3)

New IntegrityResult fields: orphan_state_events, orphan_label_events,
orphan_milestone_events, queue_stuck_locks, queue_max_attempts.

New QueueStats fields: pending_dependent_fetches,
pending_dependent_fetches_failed, pending_dependent_fetches_stuck.

Human output shows colored PASS/WARN/FAIL indicators:
- Red "!" for orphaned events (integrity failure)
- Yellow "!" for stuck locks and high retry counts (warnings)
- Dependent fetch queue line only shown when non-zero

All new queries are guarded by table_exists() checks for graceful
degradation on databases without migration 011 applied.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:08:15 -05:00
Taylor Eernisse
12811683ca feat(cli): Add 'lore count events' command with human and robot output
Extends the count command to support "events" as an entity type,
displaying resource event counts broken down by event type (state,
label, milestone) and entity type (issue, merge request).

New functions in count.rs:
- run_count_events: Creates DB connection and delegates to
  events_db::count_events for the actual queries
- print_event_count: Human-readable table with aligned columns
  showing per-type breakdowns and row/column totals
- print_event_count_json: Structured JSON matching the robot mode
  contract with ok/data envelope and per-type issue/mr/total counts

JSON output structure:
  {"ok":true,"data":{"state_events":{"issue":N,"merge_request":N,
  "total":N},"label_events":{...},"milestone_events":{...},"total":N}}

Updated exports in commands/mod.rs to expose the three new public
functions (run_count_events, print_event_count, print_event_count_json).

The "events" branch in handle_count (main.rs, committed earlier)
routes to these functions before the existing entity type dispatcher.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:08:01 -05:00
Taylor Eernisse
724be4d265 feat(queue): Add generic dependent fetch queue with exponential backoff
New module src/core/dependent_queue.rs provides job queue operations
against the pending_dependent_fetches table. Designed for second-pass
fetches that depend on primary entity ingestion (resource events,
MR close references, MR file diffs).

Queue operations:
- enqueue_job: Idempotent INSERT OR IGNORE keyed on the UNIQUE
  (project_id, entity_type, entity_iid, job_type) constraint.
  Returns bool indicating whether the row was actually inserted.

- claim_jobs: Two-phase claim — SELECT available jobs (unlocked,
  past retry window) then UPDATE locked_at in batch. Orders by
  enqueued_at ASC for FIFO processing within a job type.

- complete_job: DELETE the row on successful processing.

- fail_job: Increments attempts, calculates exponential backoff
  (30s * 2^(attempts-1), capped at 480s), sets next_retry_at,
  clears locked_at, and records the error message. Reads current
  attempts via query with unwrap_or(0) fallback for robustness.

- reclaim_stale_locks: Clears locked_at on jobs locked longer than
  a configurable threshold, recovering from worker crashes.

- count_pending_jobs: GROUP BY job_type aggregation for progress
  reporting and stats display.

Registers both events_db and dependent_queue in src/core/mod.rs.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:07:48 -05:00
Taylor Eernisse
c34ed3007e feat(db): Add event upsert functions and count queries in events_db module
New module src/core/events_db.rs provides database operations for
resource events:

- upsert_state_events: Batch INSERT OR REPLACE for state change events,
  keyed on UNIQUE(gitlab_id, project_id). Wraps in a savepoint for
  atomicity per entity batch. Maps GitLabStateEvent fields including
  optional user, source_commit, and source_merge_request_iid.

- upsert_label_events: Same pattern for label add/remove events,
  extracting label.name for denormalized storage.

- upsert_milestone_events: Same pattern for milestone assignment events,
  storing both milestone.title and milestone.id.

All three upsert functions:
- Take &mut Connection (required for savepoint creation)
- Use prepare_cached for statement reuse across batch iterations
- Convert ISO timestamps via iso_to_ms_strict for ms-epoch storage
- Propagate rusqlite errors via the #[from] LoreError::Database path
- Return the count of events processed

Supporting functions:
- resolve_entity_ids: Maps entity_type string to (issue_id, MR_id) pair
  with exactly-one-non-NULL invariant matching the CHECK constraints
- count_events: Queries all three event tables with conditional COUNT
  aggregations, returning EventCounts struct. Uses unwrap_or((0, 0))
  for graceful degradation when tables don't exist (pre-migration 011).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:07:34 -05:00
Taylor Eernisse
e73d2907dc feat(client): Add Resource Events API endpoints with generic paginated fetcher
Extends GitLabClient with methods for fetching resource events from
GitLab's per-entity API endpoints. Adds a new impl block containing:

- fetch_all_pages<T>: Generic paginated collector that handles
  x-next-page header parsing with fallback to page-size heuristics.
  Uses per_page=100 and respects the existing rate limiter via
  request_with_headers. Terminates when: (a) x-next-page header is
  absent/stale, (b) response is empty, or (c) page is not full.

- Six typed endpoint methods:
  - fetch_issue_state_events / fetch_mr_state_events
  - fetch_issue_label_events / fetch_mr_label_events
  - fetch_issue_milestone_events / fetch_mr_milestone_events

- fetch_all_resource_events: Convenience method that fetches all three
  event types for an entity (issue or merge_request) in sequence,
  returning a tuple of (state, label, milestone) event vectors.
  Routes to issue or MR endpoints based on entity_type string.

All methods follow the existing client patterns: path formatting with
gitlab_project_id and iid, error propagation via Result, and rate
limiter integration through the shared request_with_headers path.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:07:19 -05:00
Taylor Eernisse
9d4755521f feat(config): Add fetchResourceEvents config flag with --no-events CLI override
Adds a new boolean field to SyncConfig that controls whether resource
event fetching is performed during sync:

- SyncConfig.fetch_resource_events: defaults to true via serde
  default_true helper, serialized as "fetchResourceEvents" in JSON
- SyncArgs.no_events: --no-events CLI flag that overrides the config
  value to false when present
- SyncOptions.no_events: propagates the flag through the sync pipeline
- handle_sync_cmd: mutates loaded config when --no-events is set,
  ensuring the flag takes effect regardless of config file contents

This follows the existing pattern established by --no-embed and
--no-docs flags, where CLI flags override config file defaults.
The config is loaded as mutable specifically to support this override.

Also adds "events" to the count command's entity type value_parser,
enabling `lore count events` (implementation in a separate commit).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:07:06 -05:00
Taylor Eernisse
92ff255909 feat(types): Add GitLab Resource Event serde types with deserialization tests
Adds six new types for deserializing responses from GitLab's three
Resource Events API endpoints (state, label, milestone):

- GitLabStateEvent: State transitions with optional user, source_commit,
  and source_merge_request reference
- GitLabLabelEvent: Label add/remove events with nested GitLabLabelRef
- GitLabMilestoneEvent: Milestone assignment changes with nested
  GitLabMilestoneRef
- GitLabMergeRequestRef: Lightweight MR reference (iid, title, web_url)
- GitLabLabelRef: Label metadata (id, name, color, description)
- GitLabMilestoneRef: Milestone metadata (id, iid, title)

All types derive Deserialize + Serialize and use Option<T> for nullable
fields (user, source_commit, color, description) to match GitLab's API
contract where these fields may be null.

Includes 8 new test cases covering:
- State events with/without user, with/without source_merge_request
- Label events for add and remove actions, including null color handling
- Milestone event deserialization
- Standalone ref type deserialization (MR, label, milestone)

Uses r##"..."## raw string delimiters where JSON contains hex color
codes (#FF0000) that would conflict with r#"..."# delimiters.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:06:56 -05:00
Taylor Eernisse
ce5cd9c95d feat(schema): Add migration 011 for resource events, entity references, and dependent fetch queue
Introduces five new tables that power temporal queries (timeline,
file-history, trace) via GitLab Resource Events APIs:

- resource_state_events: State transitions (opened/closed/reopened/merged/locked)
  with actor tracking, source commit, and source MR references
- resource_label_events: Label add/remove history per entity
- resource_milestone_events: Milestone assignment changes per entity
- entity_references: Cross-reference table (Gate 2 prep) linking
  source/target entity pairs with reference type and discovery method
- pending_dependent_fetches: Generic job queue for resource_events,
  mr_closes_issues, and mr_diffs with exponential backoff retry

All event tables enforce entity exclusivity via CHECK constraints
(exactly one of issue_id or merge_request_id must be non-NULL).
Deduplication handled via UNIQUE indexes on (gitlab_id, project_id).
FK cascades ensure cleanup when parent entities are removed.

The dependent fetch queue uses a UNIQUE constraint on
(project_id, entity_type, entity_iid, job_type) for idempotent
enqueue, with partial indexes optimizing claim and retry queries.

Registered as migration 011 in the embedded MIGRATIONS array in db.rs.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 12:06:43 -05:00
18 changed files with 1227 additions and 21 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1 +1 @@
bd-1j1 bd-1m8

View File

@@ -0,0 +1,128 @@
-- Migration 011: Resource event tables, entity references, and dependent fetch queue
-- Powers temporal queries (timeline, file-history, trace) via GitLab Resource Events APIs.
-- State change events (opened/closed/reopened/merged/locked)
CREATE TABLE resource_state_events (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
state TEXT NOT NULL,
actor_gitlab_id INTEGER,
actor_username TEXT,
created_at INTEGER NOT NULL, -- ms epoch UTC
source_commit TEXT,
source_merge_request_iid INTEGER, -- iid from source_merge_request ref
CHECK (
(issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(issue_id IS NULL AND merge_request_id IS NOT NULL)
)
);
CREATE UNIQUE INDEX uq_state_events_gitlab ON resource_state_events(gitlab_id, project_id);
CREATE INDEX idx_state_events_issue ON resource_state_events(issue_id) WHERE issue_id IS NOT NULL;
CREATE INDEX idx_state_events_mr ON resource_state_events(merge_request_id) WHERE merge_request_id IS NOT NULL;
CREATE INDEX idx_state_events_created ON resource_state_events(created_at);
-- Label change events (add/remove)
CREATE TABLE resource_label_events (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
action TEXT NOT NULL CHECK (action IN ('add', 'remove')),
label_name TEXT NOT NULL,
actor_gitlab_id INTEGER,
actor_username TEXT,
created_at INTEGER NOT NULL, -- ms epoch UTC
CHECK (
(issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(issue_id IS NULL AND merge_request_id IS NOT NULL)
)
);
CREATE UNIQUE INDEX uq_label_events_gitlab ON resource_label_events(gitlab_id, project_id);
CREATE INDEX idx_label_events_issue ON resource_label_events(issue_id) WHERE issue_id IS NOT NULL;
CREATE INDEX idx_label_events_mr ON resource_label_events(merge_request_id) WHERE merge_request_id IS NOT NULL;
CREATE INDEX idx_label_events_created ON resource_label_events(created_at);
-- Milestone change events (add/remove)
CREATE TABLE resource_milestone_events (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
action TEXT NOT NULL CHECK (action IN ('add', 'remove')),
milestone_title TEXT NOT NULL,
milestone_id INTEGER,
actor_gitlab_id INTEGER,
actor_username TEXT,
created_at INTEGER NOT NULL, -- ms epoch UTC
CHECK (
(issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(issue_id IS NULL AND merge_request_id IS NOT NULL)
)
);
CREATE UNIQUE INDEX uq_milestone_events_gitlab ON resource_milestone_events(gitlab_id, project_id);
CREATE INDEX idx_milestone_events_issue ON resource_milestone_events(issue_id) WHERE issue_id IS NOT NULL;
CREATE INDEX idx_milestone_events_mr ON resource_milestone_events(merge_request_id) WHERE merge_request_id IS NOT NULL;
CREATE INDEX idx_milestone_events_created ON resource_milestone_events(created_at);
-- Cross-reference table (Gate 2): source/target entity pairs
CREATE TABLE entity_references (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
source_entity_type TEXT NOT NULL CHECK (source_entity_type IN ('issue', 'merge_request')),
source_entity_id INTEGER NOT NULL, -- local DB id
target_entity_type TEXT NOT NULL CHECK (target_entity_type IN ('issue', 'merge_request')),
target_entity_id INTEGER, -- local DB id (NULL if unresolved)
target_project_path TEXT, -- for unresolved cross-project refs
target_entity_iid INTEGER, -- for unresolved refs
reference_type TEXT NOT NULL CHECK (reference_type IN ('closes', 'mentioned', 'related')),
source_method TEXT NOT NULL CHECK (source_method IN ('api', 'note_parse', 'description_parse')),
created_at INTEGER NOT NULL -- ms epoch UTC
);
CREATE UNIQUE INDEX uq_entity_refs ON entity_references(
project_id,
source_entity_type,
source_entity_id,
target_entity_type,
COALESCE(target_entity_id, -1),
COALESCE(target_project_path, ''),
COALESCE(target_entity_iid, -1),
reference_type,
source_method
);
CREATE INDEX idx_entity_refs_source ON entity_references(source_entity_type, source_entity_id);
CREATE INDEX idx_entity_refs_target ON entity_references(target_entity_id) WHERE target_entity_id IS NOT NULL;
CREATE INDEX idx_entity_refs_unresolved ON entity_references(target_project_path, target_entity_iid) WHERE target_entity_id IS NULL;
-- Generic dependent fetch queue (resource_events, mr_closes_issues, mr_diffs)
CREATE TABLE pending_dependent_fetches (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
entity_type TEXT NOT NULL CHECK (entity_type IN ('issue', 'merge_request')),
entity_iid INTEGER NOT NULL,
entity_local_id INTEGER NOT NULL,
job_type TEXT NOT NULL CHECK (job_type IN ('resource_events', 'mr_closes_issues', 'mr_diffs')),
payload_json TEXT, -- optional extra data for the job
enqueued_at INTEGER NOT NULL, -- ms epoch UTC
locked_at INTEGER, -- ms epoch UTC (NULL = available)
attempts INTEGER NOT NULL DEFAULT 0,
next_retry_at INTEGER, -- ms epoch UTC (NULL = no backoff)
last_error TEXT
);
CREATE UNIQUE INDEX uq_pending_fetches ON pending_dependent_fetches(project_id, entity_type, entity_iid, job_type);
CREATE INDEX idx_pending_fetches_claimable ON pending_dependent_fetches(job_type, locked_at) WHERE locked_at IS NULL;
CREATE INDEX idx_pending_fetches_retryable ON pending_dependent_fetches(next_retry_at) WHERE locked_at IS NULL AND next_retry_at IS NOT NULL;
-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (11, strftime('%s', 'now') * 1000, 'Resource events, entity references, and dependent fetch queue');

View File

@@ -7,6 +7,7 @@ use serde::Serialize;
use crate::Config; use crate::Config;
use crate::core::db::create_connection; use crate::core::db::create_connection;
use crate::core::error::Result; use crate::core::error::Result;
use crate::core::events_db::{self, EventCounts};
use crate::core::paths::get_db_path; use crate::core::paths::get_db_path;
/// Result of count query. /// Result of count query.
@@ -237,6 +238,109 @@ struct CountJsonBreakdown {
locked: Option<i64>, locked: Option<i64>,
} }
/// Run the event count query.
pub fn run_count_events(config: &Config) -> Result<EventCounts> {
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
events_db::count_events(&conn)
}
/// JSON output structure for event counts.
#[derive(Serialize)]
struct EventCountJsonOutput {
ok: bool,
data: EventCountJsonData,
}
#[derive(Serialize)]
struct EventCountJsonData {
state_events: EventTypeCounts,
label_events: EventTypeCounts,
milestone_events: EventTypeCounts,
total: usize,
}
#[derive(Serialize)]
struct EventTypeCounts {
issue: usize,
merge_request: usize,
total: usize,
}
/// Print event counts as JSON (robot mode).
pub fn print_event_count_json(counts: &EventCounts) {
let output = EventCountJsonOutput {
ok: true,
data: EventCountJsonData {
state_events: EventTypeCounts {
issue: counts.state_issue,
merge_request: counts.state_mr,
total: counts.state_issue + counts.state_mr,
},
label_events: EventTypeCounts {
issue: counts.label_issue,
merge_request: counts.label_mr,
total: counts.label_issue + counts.label_mr,
},
milestone_events: EventTypeCounts {
issue: counts.milestone_issue,
merge_request: counts.milestone_mr,
total: counts.milestone_issue + counts.milestone_mr,
},
total: counts.total(),
},
};
println!("{}", serde_json::to_string(&output).unwrap());
}
/// Print event counts (human-readable).
pub fn print_event_count(counts: &EventCounts) {
println!(
"{:<20} {:>8} {:>8} {:>8}",
style("Event Type").cyan().bold(),
style("Issues").bold(),
style("MRs").bold(),
style("Total").bold()
);
let state_total = counts.state_issue + counts.state_mr;
let label_total = counts.label_issue + counts.label_mr;
let milestone_total = counts.milestone_issue + counts.milestone_mr;
println!(
"{:<20} {:>8} {:>8} {:>8}",
"State events",
format_number(counts.state_issue as i64),
format_number(counts.state_mr as i64),
format_number(state_total as i64)
);
println!(
"{:<20} {:>8} {:>8} {:>8}",
"Label events",
format_number(counts.label_issue as i64),
format_number(counts.label_mr as i64),
format_number(label_total as i64)
);
println!(
"{:<20} {:>8} {:>8} {:>8}",
"Milestone events",
format_number(counts.milestone_issue as i64),
format_number(counts.milestone_mr as i64),
format_number(milestone_total as i64)
);
let total_issues = counts.state_issue + counts.label_issue + counts.milestone_issue;
let total_mrs = counts.state_mr + counts.label_mr + counts.milestone_mr;
println!(
"{:<20} {:>8} {:>8} {:>8}",
style("Total").bold(),
format_number(total_issues as i64),
format_number(total_mrs as i64),
style(format_number(counts.total() as i64)).bold()
);
}
/// Print count result as JSON (robot mode). /// Print count result as JSON (robot mode).
pub fn print_count_json(result: &CountResult) { pub fn print_count_json(result: &CountResult) {
let breakdown = result.state_breakdown.as_ref().map(|b| CountJsonBreakdown { let breakdown = result.state_breakdown.as_ref().map(|b| CountJsonBreakdown {

View File

@@ -15,7 +15,10 @@ pub mod sync;
pub mod sync_status; pub mod sync_status;
pub use auth_test::run_auth_test; pub use auth_test::run_auth_test;
pub use count::{print_count, print_count_json, run_count}; pub use count::{
print_count, print_count_json, print_event_count, print_event_count_json, run_count,
run_count_events,
};
pub use doctor::{print_doctor_results, run_doctor}; pub use doctor::{print_doctor_results, run_doctor};
pub use embed::{print_embed, print_embed_json, run_embed}; pub use embed::{print_embed, print_embed_json, run_embed};
pub use generate_docs::{print_generate_docs, print_generate_docs_json, run_generate_docs}; pub use generate_docs::{print_generate_docs, print_generate_docs_json, run_generate_docs};

View File

@@ -47,6 +47,9 @@ pub struct QueueStats {
pub dirty_sources_failed: i64, pub dirty_sources_failed: i64,
pub pending_discussion_fetches: i64, pub pending_discussion_fetches: i64,
pub pending_discussion_fetches_failed: i64, pub pending_discussion_fetches_failed: i64,
pub pending_dependent_fetches: i64,
pub pending_dependent_fetches_failed: i64,
pub pending_dependent_fetches_stuck: i64,
} }
#[derive(Debug, Default, Serialize)] #[derive(Debug, Default, Serialize)]
@@ -55,6 +58,11 @@ pub struct IntegrityResult {
pub fts_doc_mismatch: bool, pub fts_doc_mismatch: bool,
pub orphan_embeddings: i64, pub orphan_embeddings: i64,
pub stale_metadata: i64, pub stale_metadata: i64,
pub orphan_state_events: i64,
pub orphan_label_events: i64,
pub orphan_milestone_events: i64,
pub queue_stuck_locks: i64,
pub queue_max_attempts: i64,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub repair: Option<RepairResult>, pub repair: Option<RepairResult>,
} }
@@ -127,6 +135,21 @@ pub fn run_stats(
)?; )?;
} }
if table_exists(&conn, "pending_dependent_fetches") {
result.queues.pending_dependent_fetches = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NULL",
)?;
result.queues.pending_dependent_fetches_failed = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NOT NULL",
)?;
result.queues.pending_dependent_fetches_stuck = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL",
)?;
}
// Integrity check // Integrity check
if check { if check {
let mut integrity = IntegrityResult::default(); let mut integrity = IntegrityResult::default();
@@ -153,9 +176,52 @@ pub fn run_stats(
)?; )?;
} }
// Orphaned resource events (FK targets missing)
if table_exists(&conn, "resource_state_events") {
integrity.orphan_state_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_state_events rse
WHERE (rse.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rse.issue_id))
OR (rse.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rse.merge_request_id))",
)?;
}
if table_exists(&conn, "resource_label_events") {
integrity.orphan_label_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_label_events rle
WHERE (rle.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rle.issue_id))
OR (rle.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rle.merge_request_id))",
)?;
}
if table_exists(&conn, "resource_milestone_events") {
integrity.orphan_milestone_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_milestone_events rme
WHERE (rme.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rme.issue_id))
OR (rme.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rme.merge_request_id))",
)?;
}
// Queue health: stuck locks and max retry attempts
if table_exists(&conn, "pending_dependent_fetches") {
integrity.queue_stuck_locks = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL",
)?;
integrity.queue_max_attempts = count_query(
&conn,
"SELECT COALESCE(MAX(attempts), 0) FROM pending_dependent_fetches",
)?;
}
let orphan_events = integrity.orphan_state_events
+ integrity.orphan_label_events
+ integrity.orphan_milestone_events;
integrity.ok = !integrity.fts_doc_mismatch integrity.ok = !integrity.fts_doc_mismatch
&& integrity.orphan_embeddings == 0 && integrity.orphan_embeddings == 0
&& integrity.stale_metadata == 0; && integrity.stale_metadata == 0
&& orphan_events == 0;
// Repair // Repair
if repair { if repair {
@@ -260,6 +326,17 @@ pub fn print_stats(result: &StatsResult) {
result.queues.pending_discussion_fetches, result.queues.pending_discussion_fetches,
result.queues.pending_discussion_fetches_failed result.queues.pending_discussion_fetches_failed
); );
if result.queues.pending_dependent_fetches > 0
|| result.queues.pending_dependent_fetches_failed > 0
|| result.queues.pending_dependent_fetches_stuck > 0
{
println!(
" Dependent fetch: {} pending, {} failed, {} stuck",
result.queues.pending_dependent_fetches,
result.queues.pending_dependent_fetches_failed,
result.queues.pending_dependent_fetches_stuck
);
}
if let Some(ref integrity) = result.integrity { if let Some(ref integrity) = result.integrity {
println!(); println!();
@@ -287,6 +364,33 @@ pub fn print_stats(result: &StatsResult) {
integrity.stale_metadata integrity.stale_metadata
); );
} }
let orphan_events = integrity.orphan_state_events
+ integrity.orphan_label_events
+ integrity.orphan_milestone_events;
if orphan_events > 0 {
println!(
" {} {} orphan resource events (state: {}, label: {}, milestone: {})",
style("!").red(),
orphan_events,
integrity.orphan_state_events,
integrity.orphan_label_events,
integrity.orphan_milestone_events
);
}
if integrity.queue_stuck_locks > 0 {
println!(
" {} {} stuck queue locks",
style("!").yellow(),
integrity.queue_stuck_locks
);
}
if integrity.queue_max_attempts > 3 {
println!(
" {} max queue retry attempts: {}",
style("!").yellow(),
integrity.queue_max_attempts
);
}
if let Some(ref repair) = integrity.repair { if let Some(ref repair) = integrity.repair {
println!(); println!();
@@ -336,6 +440,11 @@ pub fn print_stats_json(result: &StatsResult) {
fts_doc_mismatch: i.fts_doc_mismatch, fts_doc_mismatch: i.fts_doc_mismatch,
orphan_embeddings: i.orphan_embeddings, orphan_embeddings: i.orphan_embeddings,
stale_metadata: i.stale_metadata, stale_metadata: i.stale_metadata,
orphan_state_events: i.orphan_state_events,
orphan_label_events: i.orphan_label_events,
orphan_milestone_events: i.orphan_milestone_events,
queue_stuck_locks: i.queue_stuck_locks,
queue_max_attempts: i.queue_max_attempts,
repair: i.repair.as_ref().map(|r| RepairResult { repair: i.repair.as_ref().map(|r| RepairResult {
fts_rebuilt: r.fts_rebuilt, fts_rebuilt: r.fts_rebuilt,
orphans_deleted: r.orphans_deleted, orphans_deleted: r.orphans_deleted,

View File

@@ -19,6 +19,7 @@ pub struct SyncOptions {
pub force: bool, pub force: bool,
pub no_embed: bool, pub no_embed: bool,
pub no_docs: bool, pub no_docs: bool,
pub no_events: bool,
pub robot_mode: bool, pub robot_mode: bool,
} }

View File

@@ -478,6 +478,10 @@ pub struct SyncArgs {
/// Skip document regeneration /// Skip document regeneration
#[arg(long)] #[arg(long)]
pub no_docs: bool, pub no_docs: bool,
/// Skip resource event fetching (overrides config)
#[arg(long = "no-events")]
pub no_events: bool,
} }
/// Arguments for `lore embed` /// Arguments for `lore embed`
@@ -501,8 +505,8 @@ pub struct EmbedArgs {
/// Arguments for `lore count <ENTITY>` /// Arguments for `lore count <ENTITY>`
#[derive(Parser)] #[derive(Parser)]
pub struct CountArgs { pub struct CountArgs {
/// Entity type to count (issues, mrs, discussions, notes) /// Entity type to count (issues, mrs, discussions, notes, events)
#[arg(value_parser = ["issues", "mrs", "discussions", "notes"])] #[arg(value_parser = ["issues", "mrs", "discussions", "notes", "events"])]
pub entity: String, pub entity: String,
/// Parent type filter: issue or mr (for discussions/notes) /// Parent type filter: issue or mr (for discussions/notes)

View File

@@ -50,6 +50,13 @@ pub struct SyncConfig {
#[serde(rename = "dependentConcurrency")] #[serde(rename = "dependentConcurrency")]
pub dependent_concurrency: u32, pub dependent_concurrency: u32,
#[serde(rename = "fetchResourceEvents", default = "default_true")]
pub fetch_resource_events: bool,
}
fn default_true() -> bool {
true
} }
impl Default for SyncConfig { impl Default for SyncConfig {
@@ -61,6 +68,7 @@ impl Default for SyncConfig {
cursor_rewind_seconds: 2, cursor_rewind_seconds: 2,
primary_concurrency: 4, primary_concurrency: 4,
dependent_concurrency: 2, dependent_concurrency: 2,
fetch_resource_events: true,
} }
} }
} }

View File

@@ -47,6 +47,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
"010", "010",
include_str!("../../migrations/010_chunk_config.sql"), include_str!("../../migrations/010_chunk_config.sql"),
), ),
(
"011",
include_str!("../../migrations/011_resource_events.sql"),
),
]; ];
/// Create a database connection with production-grade pragmas. /// Create a database connection with production-grade pragmas.

177
src/core/dependent_queue.rs Normal file
View File

@@ -0,0 +1,177 @@
//! Generic dependent fetch queue for resource events, MR closes, and MR diffs.
//!
//! Provides enqueue, claim, complete, fail (with exponential backoff), and
//! stale lock reclamation operations against the `pending_dependent_fetches` table.
use std::collections::HashMap;
use rusqlite::Connection;
use super::error::Result;
use super::time::now_ms;
/// A pending job from the dependent fetch queue.
#[derive(Debug)]
pub struct PendingJob {
pub id: i64,
pub project_id: i64,
pub entity_type: String,
pub entity_iid: i64,
pub entity_local_id: i64,
pub job_type: String,
pub payload_json: Option<String>,
pub attempts: i32,
}
/// Enqueue a dependent fetch job. Idempotent via UNIQUE constraint (INSERT OR IGNORE).
///
/// Returns `true` if actually inserted (not deduped).
pub fn enqueue_job(
conn: &Connection,
project_id: i64,
entity_type: &str,
entity_iid: i64,
entity_local_id: i64,
job_type: &str,
payload_json: Option<&str>,
) -> Result<bool> {
let now = now_ms();
let changes = conn.execute(
"INSERT OR IGNORE INTO pending_dependent_fetches
(project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, enqueued_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
rusqlite::params![project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, now],
)?;
Ok(changes > 0)
}
/// Claim a batch of jobs for processing.
///
/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where
/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> {
if batch_size == 0 {
return Ok(Vec::new());
}
let now = now_ms();
// Find available jobs
let mut select_stmt = conn.prepare_cached(
"SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts
FROM pending_dependent_fetches
WHERE job_type = ?1
AND locked_at IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= ?2)
ORDER BY enqueued_at ASC
LIMIT ?3",
)?;
let jobs: Vec<PendingJob> = select_stmt
.query_map(
rusqlite::params![job_type, now, batch_size as i64],
|row| {
Ok(PendingJob {
id: row.get(0)?,
project_id: row.get(1)?,
entity_type: row.get(2)?,
entity_iid: row.get(3)?,
entity_local_id: row.get(4)?,
job_type: row.get(5)?,
payload_json: row.get(6)?,
attempts: row.get(7)?,
})
},
)?
.collect::<std::result::Result<Vec<_>, _>>()?;
// Lock the claimed jobs
if !jobs.is_empty() {
let ids: Vec<String> = jobs.iter().map(|j| j.id.to_string()).collect();
let placeholders = ids.join(",");
conn.execute(
&format!(
"UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})"
),
rusqlite::params![now],
)?;
}
Ok(jobs)
}
/// Mark a job as complete (DELETE the row).
pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
conn.execute(
"DELETE FROM pending_dependent_fetches WHERE id = ?1",
rusqlite::params![job_id],
)?;
Ok(())
}
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
/// backoff, clears locked_at, and records the error.
///
/// Backoff: 30s * 2^(attempts-1), capped at 480s.
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
let now = now_ms();
// Get current attempts
let current_attempts: i32 = conn
.query_row(
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
rusqlite::params![job_id],
|row| row.get(0),
)
.unwrap_or(0);
let new_attempts = current_attempts + 1;
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
let next_retry = now + backoff_ms;
conn.execute(
"UPDATE pending_dependent_fetches
SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3
WHERE id = ?4",
rusqlite::params![new_attempts, next_retry, error, job_id],
)?;
Ok(())
}
/// Reclaim stale locks (locked_at older than threshold).
///
/// Returns count of reclaimed jobs.
pub fn reclaim_stale_locks(conn: &Connection, stale_threshold_minutes: u32) -> Result<usize> {
let threshold_ms = now_ms() - (i64::from(stale_threshold_minutes) * 60 * 1000);
let changes = conn.execute(
"UPDATE pending_dependent_fetches SET locked_at = NULL WHERE locked_at < ?1",
rusqlite::params![threshold_ms],
)?;
Ok(changes)
}
/// Count pending jobs by job_type (for stats/progress).
pub fn count_pending_jobs(conn: &Connection) -> Result<HashMap<String, usize>> {
let mut stmt = conn.prepare_cached(
"SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type",
)?;
let mut counts = HashMap::new();
let rows = stmt.query_map([], |row| {
let job_type: String = row.get(0)?;
let count: i64 = row.get(1)?;
Ok((job_type, count as usize))
})?;
for row in rows {
let (job_type, count) = row?;
counts.insert(job_type, count);
}
Ok(counts)
}

232
src/core/events_db.rs Normal file
View File

@@ -0,0 +1,232 @@
//! Database upsert functions for resource events (state, label, milestone).
use rusqlite::Connection;
use super::error::{LoreError, Result};
use super::time::iso_to_ms_strict;
use crate::gitlab::types::{GitLabLabelEvent, GitLabMilestoneEvent, GitLabStateEvent};
/// Upsert state events for an entity.
///
/// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id).
/// Wraps in a savepoint for atomicity per entity.
pub fn upsert_state_events(
conn: &mut Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[GitLabStateEvent],
) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?;
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_state_events
(gitlab_id, project_id, issue_id, merge_request_id, state,
actor_gitlab_id, actor_username, created_at,
source_commit, source_merge_request_iid)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
let mut count = 0;
for event in events {
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid);
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.state,
actor_id,
actor_username,
created_at,
event.source_commit,
source_mr_iid,
])?;
count += 1;
}
drop(stmt);
sp.commit()?;
Ok(count)
}
/// Upsert label events for an entity.
pub fn upsert_label_events(
conn: &mut Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[GitLabLabelEvent],
) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?;
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_label_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
label_name, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
)?;
let mut count = 0;
for event in events {
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.label.name,
actor_id,
actor_username,
created_at,
])?;
count += 1;
}
drop(stmt);
sp.commit()?;
Ok(count)
}
/// Upsert milestone events for an entity.
pub fn upsert_milestone_events(
conn: &mut Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[GitLabMilestoneEvent],
) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?;
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_milestone_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
let mut count = 0;
for event in events {
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.milestone.title,
event.milestone.id,
actor_id,
actor_username,
created_at,
])?;
count += 1;
}
drop(stmt);
sp.commit()?;
Ok(count)
}
/// Resolve entity type string to (issue_id, merge_request_id) pair.
/// Exactly one is Some, the other is None.
fn resolve_entity_ids(entity_type: &str, entity_local_id: i64) -> Result<(Option<i64>, Option<i64>)> {
match entity_type {
"issue" => Ok((Some(entity_local_id), None)),
"merge_request" => Ok((None, Some(entity_local_id))),
_ => Err(LoreError::Other(format!(
"Invalid entity type for resource events: {entity_type}"
))),
}
}
/// Count resource events by type for the count command.
pub fn count_events(conn: &Connection) -> Result<EventCounts> {
let mut counts = EventCounts::default();
// State events
let row: (i64, i64) = conn
.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_state_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
counts.state_issue = row.0 as usize;
counts.state_mr = row.1 as usize;
// Label events
let row: (i64, i64) = conn
.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_label_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
counts.label_issue = row.0 as usize;
counts.label_mr = row.1 as usize;
// Milestone events
let row: (i64, i64) = conn
.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_milestone_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
counts.milestone_issue = row.0 as usize;
counts.milestone_mr = row.1 as usize;
Ok(counts)
}
/// Event counts broken down by type and entity.
#[derive(Debug, Default)]
pub struct EventCounts {
pub state_issue: usize,
pub state_mr: usize,
pub label_issue: usize,
pub label_mr: usize,
pub milestone_issue: usize,
pub milestone_mr: usize,
}
impl EventCounts {
pub fn total(&self) -> usize {
self.state_issue
+ self.state_mr
+ self.label_issue
+ self.label_mr
+ self.milestone_issue
+ self.milestone_mr
}
}

View File

@@ -3,7 +3,9 @@
pub mod backoff; pub mod backoff;
pub mod config; pub mod config;
pub mod db; pub mod db;
pub mod dependent_queue;
pub mod error; pub mod error;
pub mod events_db;
pub mod lock; pub mod lock;
pub mod paths; pub mod paths;
pub mod payloads; pub mod payloads;

View File

@@ -13,7 +13,8 @@ use tokio::time::sleep;
use tracing::debug; use tracing::debug;
use super::types::{ use super::types::{
GitLabDiscussion, GitLabIssue, GitLabMergeRequest, GitLabProject, GitLabUser, GitLabVersion, GitLabDiscussion, GitLabIssue, GitLabLabelEvent, GitLabMergeRequest, GitLabMilestoneEvent,
GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
}; };
use crate::core::error::{LoreError, Result}; use crate::core::error::{LoreError, Result};
@@ -550,6 +551,152 @@ impl GitLabClient {
} }
} }
/// Resource events API methods.
///
/// These endpoints return per-entity events (not project-wide), so they collect
/// all pages into a Vec rather than using streaming.
impl GitLabClient {
/// Fetch all pages from a paginated endpoint, returning collected results.
async fn fetch_all_pages<T: serde::de::DeserializeOwned>(
&self,
path: &str,
) -> Result<Vec<T>> {
let mut results = Vec::new();
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let (items, headers) = self
.request_with_headers::<Vec<T>>(path, &params)
.await?;
let is_empty = items.is_empty();
let full_page = items.len() as u32 == per_page;
results.extend(items);
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(next) if next > page => page = next,
_ => {
if is_empty || !full_page {
break;
}
page += 1;
}
}
}
Ok(results)
}
/// Fetch state events for an issue.
pub async fn fetch_issue_state_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabStateEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_state_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch label events for an issue.
pub async fn fetch_issue_label_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabLabelEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_label_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch milestone events for an issue.
pub async fn fetch_issue_milestone_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabMilestoneEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_milestone_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch state events for a merge request.
pub async fn fetch_mr_state_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabStateEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_state_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch label events for a merge request.
pub async fn fetch_mr_label_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabLabelEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_label_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch milestone events for a merge request.
pub async fn fetch_mr_milestone_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabMilestoneEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_milestone_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch all three event types for an entity in one call.
pub async fn fetch_all_resource_events(
&self,
gitlab_project_id: i64,
entity_type: &str,
iid: i64,
) -> Result<(Vec<GitLabStateEvent>, Vec<GitLabLabelEvent>, Vec<GitLabMilestoneEvent>)> {
match entity_type {
"issue" => {
let state = self.fetch_issue_state_events(gitlab_project_id, iid).await?;
let label = self.fetch_issue_label_events(gitlab_project_id, iid).await?;
let milestone = self.fetch_issue_milestone_events(gitlab_project_id, iid).await?;
Ok((state, label, milestone))
}
"merge_request" => {
let state = self.fetch_mr_state_events(gitlab_project_id, iid).await?;
let label = self.fetch_mr_label_events(gitlab_project_id, iid).await?;
let milestone = self.fetch_mr_milestone_events(gitlab_project_id, iid).await?;
Ok((state, label, milestone))
}
_ => Err(LoreError::Other(format!(
"Invalid entity type for resource events: {entity_type}"
))),
}
}
}
/// Page result for merge request pagination. /// Page result for merge request pagination.
#[derive(Debug)] #[derive(Debug)]
pub struct MergeRequestPage { pub struct MergeRequestPage {

View File

@@ -10,6 +10,7 @@ pub use transformers::{
transform_discussion, transform_issue, transform_notes, transform_discussion, transform_issue, transform_notes,
}; };
pub use types::{ pub use types::{
GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabNote, GitLabNotePosition, GitLabProject, GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabLabelEvent, GitLabLabelRef,
GitLabUser, GitLabVersion, GitLabMergeRequestRef, GitLabMilestoneEvent, GitLabMilestoneRef, GitLabNote,
GitLabNotePosition, GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
}; };

View File

@@ -182,6 +182,70 @@ impl GitLabLineRange {
} }
} }
// === Resource Event types (Phase B - Gate 1) ===
/// Reference to an MR in state event's source_merge_request field.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabMergeRequestRef {
pub iid: i64,
pub title: Option<String>,
pub web_url: Option<String>,
}
/// Reference to a label in label event's label field.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabLabelRef {
pub id: i64,
pub name: String,
pub color: Option<String>,
pub description: Option<String>,
}
/// Reference to a milestone in milestone event's milestone field.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabMilestoneRef {
pub id: i64,
pub iid: i64,
pub title: String,
}
/// State change event from the Resource State Events API.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabStateEvent {
pub id: i64,
pub user: Option<GitLabAuthor>,
pub created_at: String,
pub resource_type: String,
pub resource_id: i64,
pub state: String,
pub source_commit: Option<String>,
pub source_merge_request: Option<GitLabMergeRequestRef>,
}
/// Label change event from the Resource Label Events API.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabLabelEvent {
pub id: i64,
pub user: Option<GitLabAuthor>,
pub created_at: String,
pub resource_type: String,
pub resource_id: i64,
pub label: GitLabLabelRef,
pub action: String,
}
/// Milestone change event from the Resource Milestone Events API.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabMilestoneEvent {
pub id: i64,
pub user: Option<GitLabAuthor>,
pub created_at: String,
pub resource_type: String,
pub resource_id: i64,
pub milestone: GitLabMilestoneRef,
pub action: String,
}
// === Checkpoint 2: Merge Request types === // === Checkpoint 2: Merge Request types ===
/// GitLab MR references (short and full reference strings). /// GitLab MR references (short and full reference strings).

View File

@@ -11,14 +11,14 @@ use tracing_subscriber::util::SubscriberInitExt;
use lore::Config; use lore::Config;
use lore::cli::commands::{ use lore::cli::commands::{
InitInputs, InitOptions, InitResult, ListFilters, MrListFilters, SearchCliFilters, open_issue_in_browser, InitInputs, InitOptions, InitResult, ListFilters, MrListFilters, SearchCliFilters, open_issue_in_browser,
open_mr_in_browser, print_count, print_count_json, print_doctor_results, print_generate_docs, open_mr_in_browser, print_count, print_count_json, print_event_count, print_event_count_json, print_doctor_results, print_generate_docs,
print_generate_docs_json, print_ingest_summary, print_ingest_summary_json, print_list_issues, print_generate_docs_json, print_ingest_summary, print_ingest_summary_json, print_list_issues,
print_list_issues_json, print_list_mrs, print_list_mrs_json, print_search_results, print_list_issues_json, print_list_mrs, print_list_mrs_json, print_search_results,
print_search_results_json, print_show_issue, print_show_issue_json, print_show_mr, print_stats, print_search_results_json, print_show_issue, print_show_issue_json, print_show_mr, print_stats,
print_stats_json, print_stats_json,
print_embed, print_embed_json, print_sync, print_sync_json, print_embed, print_embed_json, print_sync, print_sync_json,
print_show_mr_json, print_sync_status, print_sync_status_json, run_auth_test, run_count, print_show_mr_json, print_sync_status, print_sync_status_json, run_auth_test, run_count,
run_doctor, run_embed, run_generate_docs, run_ingest, run_init, run_list_issues, run_list_mrs, run_count_events, run_doctor, run_embed, run_generate_docs, run_ingest, run_init, run_list_issues, run_list_mrs,
run_search, run_show_issue, run_show_mr, run_stats, run_sync, run_sync_status, SyncOptions, run_search, run_show_issue, run_show_mr, run_stats, run_sync, run_sync_status, SyncOptions,
IngestDisplay, IngestDisplay,
}; };
@@ -518,6 +518,16 @@ async fn handle_count(
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::load(config_override)?; let config = Config::load(config_override)?;
if args.entity == "events" {
let counts = run_count_events(&config)?;
if robot_mode {
print_event_count_json(&counts);
} else {
print_event_count(&counts);
}
return Ok(());
}
let result = run_count(&config, &args.entity, args.for_entity.as_deref())?; let result = run_count(&config, &args.entity, args.for_entity.as_deref())?;
if robot_mode { if robot_mode {
print_count_json(&result); print_count_json(&result);
@@ -1128,12 +1138,16 @@ async fn handle_sync_cmd(
args: SyncArgs, args: SyncArgs,
robot_mode: bool, robot_mode: bool,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::load(config_override)?; let mut config = Config::load(config_override)?;
if args.no_events {
config.sync.fetch_resource_events = false;
}
let options = SyncOptions { let options = SyncOptions {
full: args.full && !args.no_full, full: args.full && !args.no_full,
force: args.force && !args.no_force, force: args.force && !args.no_force,
no_embed: args.no_embed, no_embed: args.no_embed,
no_docs: args.no_docs, no_docs: args.no_docs,
no_events: args.no_events,
robot_mode, robot_mode,
}; };

View File

@@ -1,8 +1,10 @@
//! Tests for GitLab API response type deserialization. //! Tests for GitLab API response type deserialization.
use lore::gitlab::types::{ use lore::gitlab::types::{
GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabMergeRequest, GitLabMilestone, GitLabNote, GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabLabelEvent, GitLabLabelRef,
GitLabNotePosition, GitLabReferences, GitLabReviewer, GitLabMergeRequest, GitLabMergeRequestRef, GitLabMilestone, GitLabMilestoneEvent,
GitLabMilestoneRef, GitLabNote, GitLabNotePosition, GitLabReferences, GitLabReviewer,
GitLabStateEvent,
}; };
#[test] #[test]
@@ -637,3 +639,209 @@ fn deserializes_diffnote_position_with_line_range() {
assert_eq!(range.start_line(), Some(10)); assert_eq!(range.start_line(), Some(10));
assert_eq!(range.end_line(), Some(15)); assert_eq!(range.end_line(), Some(15));
} }
// === Resource Event type tests ===
#[test]
fn deserializes_state_event_closed_by_mr() {
let json = r#"{
"id": 1001,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "Issue",
"resource_id": 555,
"state": "closed",
"source_commit": null,
"source_merge_request": {
"iid": 99,
"title": "Fix the bug",
"web_url": "https://gitlab.example.com/group/project/-/merge_requests/99"
}
}"#;
let event: GitLabStateEvent =
serde_json::from_str(json).expect("Failed to deserialize state event");
assert_eq!(event.id, 1001);
assert!(event.user.is_some());
assert_eq!(event.user.as_ref().unwrap().username, "developer");
assert_eq!(event.resource_type, "Issue");
assert_eq!(event.resource_id, 555);
assert_eq!(event.state, "closed");
assert!(event.source_commit.is_none());
assert!(event.source_merge_request.is_some());
let mr_ref = event.source_merge_request.unwrap();
assert_eq!(mr_ref.iid, 99);
assert_eq!(mr_ref.title, Some("Fix the bug".to_string()));
}
#[test]
fn deserializes_state_event_simple_no_user() {
let json = r#"{
"id": 1002,
"user": null,
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "MergeRequest",
"resource_id": 777,
"state": "merged",
"source_commit": "abc123def456",
"source_merge_request": null
}"#;
let event: GitLabStateEvent =
serde_json::from_str(json).expect("Failed to deserialize state event without user");
assert_eq!(event.id, 1002);
assert!(event.user.is_none());
assert_eq!(event.resource_type, "MergeRequest");
assert_eq!(event.state, "merged");
assert_eq!(event.source_commit, Some("abc123def456".to_string()));
assert!(event.source_merge_request.is_none());
}
#[test]
fn deserializes_label_event_add() {
let json = r##"{
"id": 2001,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "Issue",
"resource_id": 555,
"label": {
"id": 100,
"name": "bug",
"color": "#FF0000",
"description": "Bug label"
},
"action": "add"
}"##;
let event: GitLabLabelEvent =
serde_json::from_str(json).expect("Failed to deserialize label event");
assert_eq!(event.id, 2001);
assert_eq!(event.action, "add");
assert_eq!(event.label.id, 100);
assert_eq!(event.label.name, "bug");
assert_eq!(event.label.color, Some("#FF0000".to_string()));
assert_eq!(event.label.description, Some("Bug label".to_string()));
}
#[test]
fn deserializes_label_event_remove_null_color() {
let json = r#"{
"id": 2002,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "MergeRequest",
"resource_id": 777,
"label": {
"id": 101,
"name": "needs-review",
"color": null,
"description": null
},
"action": "remove"
}"#;
let event: GitLabLabelEvent =
serde_json::from_str(json).expect("Failed to deserialize label remove event");
assert_eq!(event.action, "remove");
assert!(event.label.color.is_none());
assert!(event.label.description.is_none());
}
#[test]
fn deserializes_milestone_event() {
let json = r#"{
"id": 3001,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "Issue",
"resource_id": 555,
"milestone": {
"id": 200,
"iid": 5,
"title": "v1.0"
},
"action": "add"
}"#;
let event: GitLabMilestoneEvent =
serde_json::from_str(json).expect("Failed to deserialize milestone event");
assert_eq!(event.id, 3001);
assert_eq!(event.action, "add");
assert_eq!(event.milestone.id, 200);
assert_eq!(event.milestone.iid, 5);
assert_eq!(event.milestone.title, "v1.0");
}
#[test]
fn deserializes_merge_request_ref() {
let json = r#"{
"iid": 42,
"title": "Feature branch",
"web_url": "https://gitlab.example.com/group/project/-/merge_requests/42"
}"#;
let mr_ref: GitLabMergeRequestRef =
serde_json::from_str(json).expect("Failed to deserialize MR ref");
assert_eq!(mr_ref.iid, 42);
assert_eq!(mr_ref.title, Some("Feature branch".to_string()));
assert_eq!(
mr_ref.web_url,
Some("https://gitlab.example.com/group/project/-/merge_requests/42".to_string())
);
}
#[test]
fn deserializes_label_ref() {
let json = r##"{
"id": 100,
"name": "bug",
"color": "#FF0000",
"description": "Bug label"
}"##;
let label_ref: GitLabLabelRef =
serde_json::from_str(json).expect("Failed to deserialize label ref");
assert_eq!(label_ref.id, 100);
assert_eq!(label_ref.name, "bug");
assert_eq!(label_ref.color, Some("#FF0000".to_string()));
}
#[test]
fn deserializes_milestone_ref() {
let json = r#"{
"id": 200,
"iid": 5,
"title": "v1.0"
}"#;
let ms_ref: GitLabMilestoneRef =
serde_json::from_str(json).expect("Failed to deserialize milestone ref");
assert_eq!(ms_ref.id, 200);
assert_eq!(ms_ref.iid, 5);
assert_eq!(ms_ref.title, "v1.0");
}