1 Commits

Author SHA1 Message Date
Taylor Eernisse
98907ac666 feat(events): Implement Gate 1 resource events infrastructure
Add complete infrastructure for ingesting GitLab Resource Events
(state, label, milestone) into local SQLite tables. This enables
temporal queries (timeline, file-history, trace) in later gates.

- Add migration 011: resource_state/label/milestone_events tables,
  entity_references table, pending_dependent_fetches queue
- Add 6 serde types for GitLab Resource Events API responses
- Add fetchResourceEvents config flag with --no-events CLI override
- Add fetch_all_pages<T> generic paginator and 6 API endpoint methods
- Add DB upsert functions with savepoint atomicity (events_db.rs)
- Add dependent fetch queue with exponential backoff (dependent_queue.rs)
- Add 'lore count events' command with human table and robot JSON output
- Extend 'lore stats --check' with event FK integrity and queue health
- Add 8 unit tests for resource event type deserialization

Closes: bd-hu3, bd-2e8, bd-2fm, bd-sqw, bd-1uc, bd-tir, bd-3sh, bd-1m8

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 11:23:44 -05:00
18 changed files with 1227 additions and 21 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1 +1 @@
bd-1j1 bd-1m8

View File

@@ -0,0 +1,128 @@
-- Migration 011: Resource event tables, entity references, and dependent fetch queue
-- Powers temporal queries (timeline, file-history, trace) via GitLab Resource Events APIs.
-- State change events (opened/closed/reopened/merged/locked)
CREATE TABLE resource_state_events (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
state TEXT NOT NULL,
actor_gitlab_id INTEGER,
actor_username TEXT,
created_at INTEGER NOT NULL, -- ms epoch UTC
source_commit TEXT,
source_merge_request_iid INTEGER, -- iid from source_merge_request ref
CHECK (
(issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(issue_id IS NULL AND merge_request_id IS NOT NULL)
)
);
CREATE UNIQUE INDEX uq_state_events_gitlab ON resource_state_events(gitlab_id, project_id);
CREATE INDEX idx_state_events_issue ON resource_state_events(issue_id) WHERE issue_id IS NOT NULL;
CREATE INDEX idx_state_events_mr ON resource_state_events(merge_request_id) WHERE merge_request_id IS NOT NULL;
CREATE INDEX idx_state_events_created ON resource_state_events(created_at);
-- Label change events (add/remove)
CREATE TABLE resource_label_events (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
action TEXT NOT NULL CHECK (action IN ('add', 'remove')),
label_name TEXT NOT NULL,
actor_gitlab_id INTEGER,
actor_username TEXT,
created_at INTEGER NOT NULL, -- ms epoch UTC
CHECK (
(issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(issue_id IS NULL AND merge_request_id IS NOT NULL)
)
);
CREATE UNIQUE INDEX uq_label_events_gitlab ON resource_label_events(gitlab_id, project_id);
CREATE INDEX idx_label_events_issue ON resource_label_events(issue_id) WHERE issue_id IS NOT NULL;
CREATE INDEX idx_label_events_mr ON resource_label_events(merge_request_id) WHERE merge_request_id IS NOT NULL;
CREATE INDEX idx_label_events_created ON resource_label_events(created_at);
-- Milestone change events (add/remove)
CREATE TABLE resource_milestone_events (
id INTEGER PRIMARY KEY,
gitlab_id INTEGER NOT NULL,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
issue_id INTEGER REFERENCES issues(id) ON DELETE CASCADE,
merge_request_id INTEGER REFERENCES merge_requests(id) ON DELETE CASCADE,
action TEXT NOT NULL CHECK (action IN ('add', 'remove')),
milestone_title TEXT NOT NULL,
milestone_id INTEGER,
actor_gitlab_id INTEGER,
actor_username TEXT,
created_at INTEGER NOT NULL, -- ms epoch UTC
CHECK (
(issue_id IS NOT NULL AND merge_request_id IS NULL) OR
(issue_id IS NULL AND merge_request_id IS NOT NULL)
)
);
CREATE UNIQUE INDEX uq_milestone_events_gitlab ON resource_milestone_events(gitlab_id, project_id);
CREATE INDEX idx_milestone_events_issue ON resource_milestone_events(issue_id) WHERE issue_id IS NOT NULL;
CREATE INDEX idx_milestone_events_mr ON resource_milestone_events(merge_request_id) WHERE merge_request_id IS NOT NULL;
CREATE INDEX idx_milestone_events_created ON resource_milestone_events(created_at);
-- Cross-reference table (Gate 2): source/target entity pairs
CREATE TABLE entity_references (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
source_entity_type TEXT NOT NULL CHECK (source_entity_type IN ('issue', 'merge_request')),
source_entity_id INTEGER NOT NULL, -- local DB id
target_entity_type TEXT NOT NULL CHECK (target_entity_type IN ('issue', 'merge_request')),
target_entity_id INTEGER, -- local DB id (NULL if unresolved)
target_project_path TEXT, -- for unresolved cross-project refs
target_entity_iid INTEGER, -- for unresolved refs
reference_type TEXT NOT NULL CHECK (reference_type IN ('closes', 'mentioned', 'related')),
source_method TEXT NOT NULL CHECK (source_method IN ('api', 'note_parse', 'description_parse')),
created_at INTEGER NOT NULL -- ms epoch UTC
);
CREATE UNIQUE INDEX uq_entity_refs ON entity_references(
project_id,
source_entity_type,
source_entity_id,
target_entity_type,
COALESCE(target_entity_id, -1),
COALESCE(target_project_path, ''),
COALESCE(target_entity_iid, -1),
reference_type,
source_method
);
CREATE INDEX idx_entity_refs_source ON entity_references(source_entity_type, source_entity_id);
CREATE INDEX idx_entity_refs_target ON entity_references(target_entity_id) WHERE target_entity_id IS NOT NULL;
CREATE INDEX idx_entity_refs_unresolved ON entity_references(target_project_path, target_entity_iid) WHERE target_entity_id IS NULL;
-- Generic dependent fetch queue (resource_events, mr_closes_issues, mr_diffs)
CREATE TABLE pending_dependent_fetches (
id INTEGER PRIMARY KEY,
project_id INTEGER NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
entity_type TEXT NOT NULL CHECK (entity_type IN ('issue', 'merge_request')),
entity_iid INTEGER NOT NULL,
entity_local_id INTEGER NOT NULL,
job_type TEXT NOT NULL CHECK (job_type IN ('resource_events', 'mr_closes_issues', 'mr_diffs')),
payload_json TEXT, -- optional extra data for the job
enqueued_at INTEGER NOT NULL, -- ms epoch UTC
locked_at INTEGER, -- ms epoch UTC (NULL = available)
attempts INTEGER NOT NULL DEFAULT 0,
next_retry_at INTEGER, -- ms epoch UTC (NULL = no backoff)
last_error TEXT
);
CREATE UNIQUE INDEX uq_pending_fetches ON pending_dependent_fetches(project_id, entity_type, entity_iid, job_type);
CREATE INDEX idx_pending_fetches_claimable ON pending_dependent_fetches(job_type, locked_at) WHERE locked_at IS NULL;
CREATE INDEX idx_pending_fetches_retryable ON pending_dependent_fetches(next_retry_at) WHERE locked_at IS NULL AND next_retry_at IS NOT NULL;
-- Update schema version
INSERT INTO schema_version (version, applied_at, description)
VALUES (11, strftime('%s', 'now') * 1000, 'Resource events, entity references, and dependent fetch queue');

View File

@@ -7,6 +7,7 @@ use serde::Serialize;
use crate::Config; use crate::Config;
use crate::core::db::create_connection; use crate::core::db::create_connection;
use crate::core::error::Result; use crate::core::error::Result;
use crate::core::events_db::{self, EventCounts};
use crate::core::paths::get_db_path; use crate::core::paths::get_db_path;
/// Result of count query. /// Result of count query.
@@ -237,6 +238,109 @@ struct CountJsonBreakdown {
locked: Option<i64>, locked: Option<i64>,
} }
/// Run the event count query.
pub fn run_count_events(config: &Config) -> Result<EventCounts> {
let db_path = get_db_path(config.storage.db_path.as_deref());
let conn = create_connection(&db_path)?;
events_db::count_events(&conn)
}
/// JSON output structure for event counts.
#[derive(Serialize)]
struct EventCountJsonOutput {
ok: bool,
data: EventCountJsonData,
}
#[derive(Serialize)]
struct EventCountJsonData {
state_events: EventTypeCounts,
label_events: EventTypeCounts,
milestone_events: EventTypeCounts,
total: usize,
}
#[derive(Serialize)]
struct EventTypeCounts {
issue: usize,
merge_request: usize,
total: usize,
}
/// Print event counts as JSON (robot mode).
pub fn print_event_count_json(counts: &EventCounts) {
let output = EventCountJsonOutput {
ok: true,
data: EventCountJsonData {
state_events: EventTypeCounts {
issue: counts.state_issue,
merge_request: counts.state_mr,
total: counts.state_issue + counts.state_mr,
},
label_events: EventTypeCounts {
issue: counts.label_issue,
merge_request: counts.label_mr,
total: counts.label_issue + counts.label_mr,
},
milestone_events: EventTypeCounts {
issue: counts.milestone_issue,
merge_request: counts.milestone_mr,
total: counts.milestone_issue + counts.milestone_mr,
},
total: counts.total(),
},
};
println!("{}", serde_json::to_string(&output).unwrap());
}
/// Print event counts (human-readable).
pub fn print_event_count(counts: &EventCounts) {
println!(
"{:<20} {:>8} {:>8} {:>8}",
style("Event Type").cyan().bold(),
style("Issues").bold(),
style("MRs").bold(),
style("Total").bold()
);
let state_total = counts.state_issue + counts.state_mr;
let label_total = counts.label_issue + counts.label_mr;
let milestone_total = counts.milestone_issue + counts.milestone_mr;
println!(
"{:<20} {:>8} {:>8} {:>8}",
"State events",
format_number(counts.state_issue as i64),
format_number(counts.state_mr as i64),
format_number(state_total as i64)
);
println!(
"{:<20} {:>8} {:>8} {:>8}",
"Label events",
format_number(counts.label_issue as i64),
format_number(counts.label_mr as i64),
format_number(label_total as i64)
);
println!(
"{:<20} {:>8} {:>8} {:>8}",
"Milestone events",
format_number(counts.milestone_issue as i64),
format_number(counts.milestone_mr as i64),
format_number(milestone_total as i64)
);
let total_issues = counts.state_issue + counts.label_issue + counts.milestone_issue;
let total_mrs = counts.state_mr + counts.label_mr + counts.milestone_mr;
println!(
"{:<20} {:>8} {:>8} {:>8}",
style("Total").bold(),
format_number(total_issues as i64),
format_number(total_mrs as i64),
style(format_number(counts.total() as i64)).bold()
);
}
/// Print count result as JSON (robot mode). /// Print count result as JSON (robot mode).
pub fn print_count_json(result: &CountResult) { pub fn print_count_json(result: &CountResult) {
let breakdown = result.state_breakdown.as_ref().map(|b| CountJsonBreakdown { let breakdown = result.state_breakdown.as_ref().map(|b| CountJsonBreakdown {

View File

@@ -15,7 +15,10 @@ pub mod sync;
pub mod sync_status; pub mod sync_status;
pub use auth_test::run_auth_test; pub use auth_test::run_auth_test;
pub use count::{print_count, print_count_json, run_count}; pub use count::{
print_count, print_count_json, print_event_count, print_event_count_json, run_count,
run_count_events,
};
pub use doctor::{print_doctor_results, run_doctor}; pub use doctor::{print_doctor_results, run_doctor};
pub use embed::{print_embed, print_embed_json, run_embed}; pub use embed::{print_embed, print_embed_json, run_embed};
pub use generate_docs::{print_generate_docs, print_generate_docs_json, run_generate_docs}; pub use generate_docs::{print_generate_docs, print_generate_docs_json, run_generate_docs};

View File

@@ -47,6 +47,9 @@ pub struct QueueStats {
pub dirty_sources_failed: i64, pub dirty_sources_failed: i64,
pub pending_discussion_fetches: i64, pub pending_discussion_fetches: i64,
pub pending_discussion_fetches_failed: i64, pub pending_discussion_fetches_failed: i64,
pub pending_dependent_fetches: i64,
pub pending_dependent_fetches_failed: i64,
pub pending_dependent_fetches_stuck: i64,
} }
#[derive(Debug, Default, Serialize)] #[derive(Debug, Default, Serialize)]
@@ -55,6 +58,11 @@ pub struct IntegrityResult {
pub fts_doc_mismatch: bool, pub fts_doc_mismatch: bool,
pub orphan_embeddings: i64, pub orphan_embeddings: i64,
pub stale_metadata: i64, pub stale_metadata: i64,
pub orphan_state_events: i64,
pub orphan_label_events: i64,
pub orphan_milestone_events: i64,
pub queue_stuck_locks: i64,
pub queue_max_attempts: i64,
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub repair: Option<RepairResult>, pub repair: Option<RepairResult>,
} }
@@ -127,6 +135,21 @@ pub fn run_stats(
)?; )?;
} }
if table_exists(&conn, "pending_dependent_fetches") {
result.queues.pending_dependent_fetches = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NULL",
)?;
result.queues.pending_dependent_fetches_failed = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE last_error IS NOT NULL",
)?;
result.queues.pending_dependent_fetches_stuck = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL",
)?;
}
// Integrity check // Integrity check
if check { if check {
let mut integrity = IntegrityResult::default(); let mut integrity = IntegrityResult::default();
@@ -153,9 +176,52 @@ pub fn run_stats(
)?; )?;
} }
// Orphaned resource events (FK targets missing)
if table_exists(&conn, "resource_state_events") {
integrity.orphan_state_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_state_events rse
WHERE (rse.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rse.issue_id))
OR (rse.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rse.merge_request_id))",
)?;
}
if table_exists(&conn, "resource_label_events") {
integrity.orphan_label_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_label_events rle
WHERE (rle.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rle.issue_id))
OR (rle.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rle.merge_request_id))",
)?;
}
if table_exists(&conn, "resource_milestone_events") {
integrity.orphan_milestone_events = count_query(
&conn,
"SELECT COUNT(*) FROM resource_milestone_events rme
WHERE (rme.issue_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM issues i WHERE i.id = rme.issue_id))
OR (rme.merge_request_id IS NOT NULL AND NOT EXISTS (SELECT 1 FROM merge_requests m WHERE m.id = rme.merge_request_id))",
)?;
}
// Queue health: stuck locks and max retry attempts
if table_exists(&conn, "pending_dependent_fetches") {
integrity.queue_stuck_locks = count_query(
&conn,
"SELECT COUNT(*) FROM pending_dependent_fetches WHERE locked_at IS NOT NULL",
)?;
integrity.queue_max_attempts = count_query(
&conn,
"SELECT COALESCE(MAX(attempts), 0) FROM pending_dependent_fetches",
)?;
}
let orphan_events = integrity.orphan_state_events
+ integrity.orphan_label_events
+ integrity.orphan_milestone_events;
integrity.ok = !integrity.fts_doc_mismatch integrity.ok = !integrity.fts_doc_mismatch
&& integrity.orphan_embeddings == 0 && integrity.orphan_embeddings == 0
&& integrity.stale_metadata == 0; && integrity.stale_metadata == 0
&& orphan_events == 0;
// Repair // Repair
if repair { if repair {
@@ -260,6 +326,17 @@ pub fn print_stats(result: &StatsResult) {
result.queues.pending_discussion_fetches, result.queues.pending_discussion_fetches,
result.queues.pending_discussion_fetches_failed result.queues.pending_discussion_fetches_failed
); );
if result.queues.pending_dependent_fetches > 0
|| result.queues.pending_dependent_fetches_failed > 0
|| result.queues.pending_dependent_fetches_stuck > 0
{
println!(
" Dependent fetch: {} pending, {} failed, {} stuck",
result.queues.pending_dependent_fetches,
result.queues.pending_dependent_fetches_failed,
result.queues.pending_dependent_fetches_stuck
);
}
if let Some(ref integrity) = result.integrity { if let Some(ref integrity) = result.integrity {
println!(); println!();
@@ -287,6 +364,33 @@ pub fn print_stats(result: &StatsResult) {
integrity.stale_metadata integrity.stale_metadata
); );
} }
let orphan_events = integrity.orphan_state_events
+ integrity.orphan_label_events
+ integrity.orphan_milestone_events;
if orphan_events > 0 {
println!(
" {} {} orphan resource events (state: {}, label: {}, milestone: {})",
style("!").red(),
orphan_events,
integrity.orphan_state_events,
integrity.orphan_label_events,
integrity.orphan_milestone_events
);
}
if integrity.queue_stuck_locks > 0 {
println!(
" {} {} stuck queue locks",
style("!").yellow(),
integrity.queue_stuck_locks
);
}
if integrity.queue_max_attempts > 3 {
println!(
" {} max queue retry attempts: {}",
style("!").yellow(),
integrity.queue_max_attempts
);
}
if let Some(ref repair) = integrity.repair { if let Some(ref repair) = integrity.repair {
println!(); println!();
@@ -336,6 +440,11 @@ pub fn print_stats_json(result: &StatsResult) {
fts_doc_mismatch: i.fts_doc_mismatch, fts_doc_mismatch: i.fts_doc_mismatch,
orphan_embeddings: i.orphan_embeddings, orphan_embeddings: i.orphan_embeddings,
stale_metadata: i.stale_metadata, stale_metadata: i.stale_metadata,
orphan_state_events: i.orphan_state_events,
orphan_label_events: i.orphan_label_events,
orphan_milestone_events: i.orphan_milestone_events,
queue_stuck_locks: i.queue_stuck_locks,
queue_max_attempts: i.queue_max_attempts,
repair: i.repair.as_ref().map(|r| RepairResult { repair: i.repair.as_ref().map(|r| RepairResult {
fts_rebuilt: r.fts_rebuilt, fts_rebuilt: r.fts_rebuilt,
orphans_deleted: r.orphans_deleted, orphans_deleted: r.orphans_deleted,

View File

@@ -19,6 +19,7 @@ pub struct SyncOptions {
pub force: bool, pub force: bool,
pub no_embed: bool, pub no_embed: bool,
pub no_docs: bool, pub no_docs: bool,
pub no_events: bool,
pub robot_mode: bool, pub robot_mode: bool,
} }

View File

@@ -478,6 +478,10 @@ pub struct SyncArgs {
/// Skip document regeneration /// Skip document regeneration
#[arg(long)] #[arg(long)]
pub no_docs: bool, pub no_docs: bool,
/// Skip resource event fetching (overrides config)
#[arg(long = "no-events")]
pub no_events: bool,
} }
/// Arguments for `lore embed` /// Arguments for `lore embed`
@@ -501,8 +505,8 @@ pub struct EmbedArgs {
/// Arguments for `lore count <ENTITY>` /// Arguments for `lore count <ENTITY>`
#[derive(Parser)] #[derive(Parser)]
pub struct CountArgs { pub struct CountArgs {
/// Entity type to count (issues, mrs, discussions, notes) /// Entity type to count (issues, mrs, discussions, notes, events)
#[arg(value_parser = ["issues", "mrs", "discussions", "notes"])] #[arg(value_parser = ["issues", "mrs", "discussions", "notes", "events"])]
pub entity: String, pub entity: String,
/// Parent type filter: issue or mr (for discussions/notes) /// Parent type filter: issue or mr (for discussions/notes)

View File

@@ -50,6 +50,13 @@ pub struct SyncConfig {
#[serde(rename = "dependentConcurrency")] #[serde(rename = "dependentConcurrency")]
pub dependent_concurrency: u32, pub dependent_concurrency: u32,
#[serde(rename = "fetchResourceEvents", default = "default_true")]
pub fetch_resource_events: bool,
}
fn default_true() -> bool {
true
} }
impl Default for SyncConfig { impl Default for SyncConfig {
@@ -61,6 +68,7 @@ impl Default for SyncConfig {
cursor_rewind_seconds: 2, cursor_rewind_seconds: 2,
primary_concurrency: 4, primary_concurrency: 4,
dependent_concurrency: 2, dependent_concurrency: 2,
fetch_resource_events: true,
} }
} }
} }

View File

@@ -47,6 +47,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
"010", "010",
include_str!("../../migrations/010_chunk_config.sql"), include_str!("../../migrations/010_chunk_config.sql"),
), ),
(
"011",
include_str!("../../migrations/011_resource_events.sql"),
),
]; ];
/// Create a database connection with production-grade pragmas. /// Create a database connection with production-grade pragmas.

177
src/core/dependent_queue.rs Normal file
View File

@@ -0,0 +1,177 @@
//! Generic dependent fetch queue for resource events, MR closes, and MR diffs.
//!
//! Provides enqueue, claim, complete, fail (with exponential backoff), and
//! stale lock reclamation operations against the `pending_dependent_fetches` table.
use std::collections::HashMap;
use rusqlite::Connection;
use super::error::Result;
use super::time::now_ms;
/// A pending job from the dependent fetch queue.
#[derive(Debug)]
pub struct PendingJob {
pub id: i64,
pub project_id: i64,
pub entity_type: String,
pub entity_iid: i64,
pub entity_local_id: i64,
pub job_type: String,
pub payload_json: Option<String>,
pub attempts: i32,
}
/// Enqueue a dependent fetch job. Idempotent via UNIQUE constraint (INSERT OR IGNORE).
///
/// Returns `true` if actually inserted (not deduped).
pub fn enqueue_job(
conn: &Connection,
project_id: i64,
entity_type: &str,
entity_iid: i64,
entity_local_id: i64,
job_type: &str,
payload_json: Option<&str>,
) -> Result<bool> {
let now = now_ms();
let changes = conn.execute(
"INSERT OR IGNORE INTO pending_dependent_fetches
(project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, enqueued_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
rusqlite::params![project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, now],
)?;
Ok(changes > 0)
}
/// Claim a batch of jobs for processing.
///
/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where
/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> {
if batch_size == 0 {
return Ok(Vec::new());
}
let now = now_ms();
// Find available jobs
let mut select_stmt = conn.prepare_cached(
"SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts
FROM pending_dependent_fetches
WHERE job_type = ?1
AND locked_at IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= ?2)
ORDER BY enqueued_at ASC
LIMIT ?3",
)?;
let jobs: Vec<PendingJob> = select_stmt
.query_map(
rusqlite::params![job_type, now, batch_size as i64],
|row| {
Ok(PendingJob {
id: row.get(0)?,
project_id: row.get(1)?,
entity_type: row.get(2)?,
entity_iid: row.get(3)?,
entity_local_id: row.get(4)?,
job_type: row.get(5)?,
payload_json: row.get(6)?,
attempts: row.get(7)?,
})
},
)?
.collect::<std::result::Result<Vec<_>, _>>()?;
// Lock the claimed jobs
if !jobs.is_empty() {
let ids: Vec<String> = jobs.iter().map(|j| j.id.to_string()).collect();
let placeholders = ids.join(",");
conn.execute(
&format!(
"UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})"
),
rusqlite::params![now],
)?;
}
Ok(jobs)
}
/// Mark a job as complete (DELETE the row).
pub fn complete_job(conn: &Connection, job_id: i64) -> Result<()> {
conn.execute(
"DELETE FROM pending_dependent_fetches WHERE id = ?1",
rusqlite::params![job_id],
)?;
Ok(())
}
/// Mark a job as failed. Increments attempts, sets next_retry_at with exponential
/// backoff, clears locked_at, and records the error.
///
/// Backoff: 30s * 2^(attempts-1), capped at 480s.
pub fn fail_job(conn: &Connection, job_id: i64, error: &str) -> Result<()> {
let now = now_ms();
// Get current attempts
let current_attempts: i32 = conn
.query_row(
"SELECT attempts FROM pending_dependent_fetches WHERE id = ?1",
rusqlite::params![job_id],
|row| row.get(0),
)
.unwrap_or(0);
let new_attempts = current_attempts + 1;
let backoff_ms: i64 = (30_000i64 * (1i64 << (new_attempts - 1).min(4))).min(480_000);
let next_retry = now + backoff_ms;
conn.execute(
"UPDATE pending_dependent_fetches
SET attempts = ?1, next_retry_at = ?2, locked_at = NULL, last_error = ?3
WHERE id = ?4",
rusqlite::params![new_attempts, next_retry, error, job_id],
)?;
Ok(())
}
/// Reclaim stale locks (locked_at older than threshold).
///
/// Returns count of reclaimed jobs.
pub fn reclaim_stale_locks(conn: &Connection, stale_threshold_minutes: u32) -> Result<usize> {
let threshold_ms = now_ms() - (i64::from(stale_threshold_minutes) * 60 * 1000);
let changes = conn.execute(
"UPDATE pending_dependent_fetches SET locked_at = NULL WHERE locked_at < ?1",
rusqlite::params![threshold_ms],
)?;
Ok(changes)
}
/// Count pending jobs by job_type (for stats/progress).
pub fn count_pending_jobs(conn: &Connection) -> Result<HashMap<String, usize>> {
let mut stmt = conn.prepare_cached(
"SELECT job_type, COUNT(*) FROM pending_dependent_fetches GROUP BY job_type",
)?;
let mut counts = HashMap::new();
let rows = stmt.query_map([], |row| {
let job_type: String = row.get(0)?;
let count: i64 = row.get(1)?;
Ok((job_type, count as usize))
})?;
for row in rows {
let (job_type, count) = row?;
counts.insert(job_type, count);
}
Ok(counts)
}

232
src/core/events_db.rs Normal file
View File

@@ -0,0 +1,232 @@
//! Database upsert functions for resource events (state, label, milestone).
use rusqlite::Connection;
use super::error::{LoreError, Result};
use super::time::iso_to_ms_strict;
use crate::gitlab::types::{GitLabLabelEvent, GitLabMilestoneEvent, GitLabStateEvent};
/// Upsert state events for an entity.
///
/// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id).
/// Wraps in a savepoint for atomicity per entity.
pub fn upsert_state_events(
conn: &mut Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[GitLabStateEvent],
) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?;
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_state_events
(gitlab_id, project_id, issue_id, merge_request_id, state,
actor_gitlab_id, actor_username, created_at,
source_commit, source_merge_request_iid)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
let mut count = 0;
for event in events {
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid);
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.state,
actor_id,
actor_username,
created_at,
event.source_commit,
source_mr_iid,
])?;
count += 1;
}
drop(stmt);
sp.commit()?;
Ok(count)
}
/// Upsert label events for an entity.
pub fn upsert_label_events(
conn: &mut Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[GitLabLabelEvent],
) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?;
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_label_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
label_name, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
)?;
let mut count = 0;
for event in events {
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.label.name,
actor_id,
actor_username,
created_at,
])?;
count += 1;
}
drop(stmt);
sp.commit()?;
Ok(count)
}
/// Upsert milestone events for an entity.
pub fn upsert_milestone_events(
conn: &mut Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[GitLabMilestoneEvent],
) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?;
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_milestone_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
let mut count = 0;
for event in events {
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.milestone.title,
event.milestone.id,
actor_id,
actor_username,
created_at,
])?;
count += 1;
}
drop(stmt);
sp.commit()?;
Ok(count)
}
/// Resolve entity type string to (issue_id, merge_request_id) pair.
/// Exactly one is Some, the other is None.
fn resolve_entity_ids(entity_type: &str, entity_local_id: i64) -> Result<(Option<i64>, Option<i64>)> {
match entity_type {
"issue" => Ok((Some(entity_local_id), None)),
"merge_request" => Ok((None, Some(entity_local_id))),
_ => Err(LoreError::Other(format!(
"Invalid entity type for resource events: {entity_type}"
))),
}
}
/// Count resource events by type for the count command.
pub fn count_events(conn: &Connection) -> Result<EventCounts> {
let mut counts = EventCounts::default();
// State events
let row: (i64, i64) = conn
.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_state_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
counts.state_issue = row.0 as usize;
counts.state_mr = row.1 as usize;
// Label events
let row: (i64, i64) = conn
.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_label_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
counts.label_issue = row.0 as usize;
counts.label_mr = row.1 as usize;
// Milestone events
let row: (i64, i64) = conn
.query_row(
"SELECT
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
FROM resource_milestone_events",
[],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.unwrap_or((0, 0));
counts.milestone_issue = row.0 as usize;
counts.milestone_mr = row.1 as usize;
Ok(counts)
}
/// Event counts broken down by type and entity.
#[derive(Debug, Default)]
pub struct EventCounts {
pub state_issue: usize,
pub state_mr: usize,
pub label_issue: usize,
pub label_mr: usize,
pub milestone_issue: usize,
pub milestone_mr: usize,
}
impl EventCounts {
pub fn total(&self) -> usize {
self.state_issue
+ self.state_mr
+ self.label_issue
+ self.label_mr
+ self.milestone_issue
+ self.milestone_mr
}
}

View File

@@ -3,7 +3,9 @@
pub mod backoff; pub mod backoff;
pub mod config; pub mod config;
pub mod db; pub mod db;
pub mod dependent_queue;
pub mod error; pub mod error;
pub mod events_db;
pub mod lock; pub mod lock;
pub mod paths; pub mod paths;
pub mod payloads; pub mod payloads;

View File

@@ -13,7 +13,8 @@ use tokio::time::sleep;
use tracing::debug; use tracing::debug;
use super::types::{ use super::types::{
GitLabDiscussion, GitLabIssue, GitLabMergeRequest, GitLabProject, GitLabUser, GitLabVersion, GitLabDiscussion, GitLabIssue, GitLabLabelEvent, GitLabMergeRequest, GitLabMilestoneEvent,
GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
}; };
use crate::core::error::{LoreError, Result}; use crate::core::error::{LoreError, Result};
@@ -550,6 +551,152 @@ impl GitLabClient {
} }
} }
/// Resource events API methods.
///
/// These endpoints return per-entity events (not project-wide), so they collect
/// all pages into a Vec rather than using streaming.
impl GitLabClient {
/// Fetch all pages from a paginated endpoint, returning collected results.
async fn fetch_all_pages<T: serde::de::DeserializeOwned>(
&self,
path: &str,
) -> Result<Vec<T>> {
let mut results = Vec::new();
let mut page = 1u32;
let per_page = 100u32;
loop {
let params = vec![
("per_page", per_page.to_string()),
("page", page.to_string()),
];
let (items, headers) = self
.request_with_headers::<Vec<T>>(path, &params)
.await?;
let is_empty = items.is_empty();
let full_page = items.len() as u32 == per_page;
results.extend(items);
let next_page = headers
.get("x-next-page")
.and_then(|v| v.to_str().ok())
.and_then(|s| s.parse::<u32>().ok());
match next_page {
Some(next) if next > page => page = next,
_ => {
if is_empty || !full_page {
break;
}
page += 1;
}
}
}
Ok(results)
}
/// Fetch state events for an issue.
pub async fn fetch_issue_state_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabStateEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_state_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch label events for an issue.
pub async fn fetch_issue_label_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabLabelEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_label_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch milestone events for an issue.
pub async fn fetch_issue_milestone_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabMilestoneEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/issues/{iid}/resource_milestone_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch state events for a merge request.
pub async fn fetch_mr_state_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabStateEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_state_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch label events for a merge request.
pub async fn fetch_mr_label_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabLabelEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_label_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch milestone events for a merge request.
pub async fn fetch_mr_milestone_events(
&self,
gitlab_project_id: i64,
iid: i64,
) -> Result<Vec<GitLabMilestoneEvent>> {
let path = format!(
"/api/v4/projects/{gitlab_project_id}/merge_requests/{iid}/resource_milestone_events"
);
self.fetch_all_pages(&path).await
}
/// Fetch all three event types for an entity in one call.
pub async fn fetch_all_resource_events(
&self,
gitlab_project_id: i64,
entity_type: &str,
iid: i64,
) -> Result<(Vec<GitLabStateEvent>, Vec<GitLabLabelEvent>, Vec<GitLabMilestoneEvent>)> {
match entity_type {
"issue" => {
let state = self.fetch_issue_state_events(gitlab_project_id, iid).await?;
let label = self.fetch_issue_label_events(gitlab_project_id, iid).await?;
let milestone = self.fetch_issue_milestone_events(gitlab_project_id, iid).await?;
Ok((state, label, milestone))
}
"merge_request" => {
let state = self.fetch_mr_state_events(gitlab_project_id, iid).await?;
let label = self.fetch_mr_label_events(gitlab_project_id, iid).await?;
let milestone = self.fetch_mr_milestone_events(gitlab_project_id, iid).await?;
Ok((state, label, milestone))
}
_ => Err(LoreError::Other(format!(
"Invalid entity type for resource events: {entity_type}"
))),
}
}
}
/// Page result for merge request pagination. /// Page result for merge request pagination.
#[derive(Debug)] #[derive(Debug)]
pub struct MergeRequestPage { pub struct MergeRequestPage {

View File

@@ -10,6 +10,7 @@ pub use transformers::{
transform_discussion, transform_issue, transform_notes, transform_discussion, transform_issue, transform_notes,
}; };
pub use types::{ pub use types::{
GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabNote, GitLabNotePosition, GitLabProject, GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabLabelEvent, GitLabLabelRef,
GitLabUser, GitLabVersion, GitLabMergeRequestRef, GitLabMilestoneEvent, GitLabMilestoneRef, GitLabNote,
GitLabNotePosition, GitLabProject, GitLabStateEvent, GitLabUser, GitLabVersion,
}; };

View File

@@ -182,6 +182,70 @@ impl GitLabLineRange {
} }
} }
// === Resource Event types (Phase B - Gate 1) ===
/// Reference to an MR in state event's source_merge_request field.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabMergeRequestRef {
pub iid: i64,
pub title: Option<String>,
pub web_url: Option<String>,
}
/// Reference to a label in label event's label field.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabLabelRef {
pub id: i64,
pub name: String,
pub color: Option<String>,
pub description: Option<String>,
}
/// Reference to a milestone in milestone event's milestone field.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabMilestoneRef {
pub id: i64,
pub iid: i64,
pub title: String,
}
/// State change event from the Resource State Events API.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabStateEvent {
pub id: i64,
pub user: Option<GitLabAuthor>,
pub created_at: String,
pub resource_type: String,
pub resource_id: i64,
pub state: String,
pub source_commit: Option<String>,
pub source_merge_request: Option<GitLabMergeRequestRef>,
}
/// Label change event from the Resource Label Events API.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabLabelEvent {
pub id: i64,
pub user: Option<GitLabAuthor>,
pub created_at: String,
pub resource_type: String,
pub resource_id: i64,
pub label: GitLabLabelRef,
pub action: String,
}
/// Milestone change event from the Resource Milestone Events API.
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GitLabMilestoneEvent {
pub id: i64,
pub user: Option<GitLabAuthor>,
pub created_at: String,
pub resource_type: String,
pub resource_id: i64,
pub milestone: GitLabMilestoneRef,
pub action: String,
}
// === Checkpoint 2: Merge Request types === // === Checkpoint 2: Merge Request types ===
/// GitLab MR references (short and full reference strings). /// GitLab MR references (short and full reference strings).

View File

@@ -11,14 +11,14 @@ use tracing_subscriber::util::SubscriberInitExt;
use lore::Config; use lore::Config;
use lore::cli::commands::{ use lore::cli::commands::{
InitInputs, InitOptions, InitResult, ListFilters, MrListFilters, SearchCliFilters, open_issue_in_browser, InitInputs, InitOptions, InitResult, ListFilters, MrListFilters, SearchCliFilters, open_issue_in_browser,
open_mr_in_browser, print_count, print_count_json, print_doctor_results, print_generate_docs, open_mr_in_browser, print_count, print_count_json, print_event_count, print_event_count_json, print_doctor_results, print_generate_docs,
print_generate_docs_json, print_ingest_summary, print_ingest_summary_json, print_list_issues, print_generate_docs_json, print_ingest_summary, print_ingest_summary_json, print_list_issues,
print_list_issues_json, print_list_mrs, print_list_mrs_json, print_search_results, print_list_issues_json, print_list_mrs, print_list_mrs_json, print_search_results,
print_search_results_json, print_show_issue, print_show_issue_json, print_show_mr, print_stats, print_search_results_json, print_show_issue, print_show_issue_json, print_show_mr, print_stats,
print_stats_json, print_stats_json,
print_embed, print_embed_json, print_sync, print_sync_json, print_embed, print_embed_json, print_sync, print_sync_json,
print_show_mr_json, print_sync_status, print_sync_status_json, run_auth_test, run_count, print_show_mr_json, print_sync_status, print_sync_status_json, run_auth_test, run_count,
run_doctor, run_embed, run_generate_docs, run_ingest, run_init, run_list_issues, run_list_mrs, run_count_events, run_doctor, run_embed, run_generate_docs, run_ingest, run_init, run_list_issues, run_list_mrs,
run_search, run_show_issue, run_show_mr, run_stats, run_sync, run_sync_status, SyncOptions, run_search, run_show_issue, run_show_mr, run_stats, run_sync, run_sync_status, SyncOptions,
IngestDisplay, IngestDisplay,
}; };
@@ -518,6 +518,16 @@ async fn handle_count(
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::load(config_override)?; let config = Config::load(config_override)?;
if args.entity == "events" {
let counts = run_count_events(&config)?;
if robot_mode {
print_event_count_json(&counts);
} else {
print_event_count(&counts);
}
return Ok(());
}
let result = run_count(&config, &args.entity, args.for_entity.as_deref())?; let result = run_count(&config, &args.entity, args.for_entity.as_deref())?;
if robot_mode { if robot_mode {
print_count_json(&result); print_count_json(&result);
@@ -1128,12 +1138,16 @@ async fn handle_sync_cmd(
args: SyncArgs, args: SyncArgs,
robot_mode: bool, robot_mode: bool,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::load(config_override)?; let mut config = Config::load(config_override)?;
if args.no_events {
config.sync.fetch_resource_events = false;
}
let options = SyncOptions { let options = SyncOptions {
full: args.full && !args.no_full, full: args.full && !args.no_full,
force: args.force && !args.no_force, force: args.force && !args.no_force,
no_embed: args.no_embed, no_embed: args.no_embed,
no_docs: args.no_docs, no_docs: args.no_docs,
no_events: args.no_events,
robot_mode, robot_mode,
}; };

View File

@@ -1,8 +1,10 @@
//! Tests for GitLab API response type deserialization. //! Tests for GitLab API response type deserialization.
use lore::gitlab::types::{ use lore::gitlab::types::{
GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabMergeRequest, GitLabMilestone, GitLabNote, GitLabAuthor, GitLabDiscussion, GitLabIssue, GitLabLabelEvent, GitLabLabelRef,
GitLabNotePosition, GitLabReferences, GitLabReviewer, GitLabMergeRequest, GitLabMergeRequestRef, GitLabMilestone, GitLabMilestoneEvent,
GitLabMilestoneRef, GitLabNote, GitLabNotePosition, GitLabReferences, GitLabReviewer,
GitLabStateEvent,
}; };
#[test] #[test]
@@ -637,3 +639,209 @@ fn deserializes_diffnote_position_with_line_range() {
assert_eq!(range.start_line(), Some(10)); assert_eq!(range.start_line(), Some(10));
assert_eq!(range.end_line(), Some(15)); assert_eq!(range.end_line(), Some(15));
} }
// === Resource Event type tests ===
#[test]
fn deserializes_state_event_closed_by_mr() {
let json = r#"{
"id": 1001,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "Issue",
"resource_id": 555,
"state": "closed",
"source_commit": null,
"source_merge_request": {
"iid": 99,
"title": "Fix the bug",
"web_url": "https://gitlab.example.com/group/project/-/merge_requests/99"
}
}"#;
let event: GitLabStateEvent =
serde_json::from_str(json).expect("Failed to deserialize state event");
assert_eq!(event.id, 1001);
assert!(event.user.is_some());
assert_eq!(event.user.as_ref().unwrap().username, "developer");
assert_eq!(event.resource_type, "Issue");
assert_eq!(event.resource_id, 555);
assert_eq!(event.state, "closed");
assert!(event.source_commit.is_none());
assert!(event.source_merge_request.is_some());
let mr_ref = event.source_merge_request.unwrap();
assert_eq!(mr_ref.iid, 99);
assert_eq!(mr_ref.title, Some("Fix the bug".to_string()));
}
#[test]
fn deserializes_state_event_simple_no_user() {
let json = r#"{
"id": 1002,
"user": null,
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "MergeRequest",
"resource_id": 777,
"state": "merged",
"source_commit": "abc123def456",
"source_merge_request": null
}"#;
let event: GitLabStateEvent =
serde_json::from_str(json).expect("Failed to deserialize state event without user");
assert_eq!(event.id, 1002);
assert!(event.user.is_none());
assert_eq!(event.resource_type, "MergeRequest");
assert_eq!(event.state, "merged");
assert_eq!(event.source_commit, Some("abc123def456".to_string()));
assert!(event.source_merge_request.is_none());
}
#[test]
fn deserializes_label_event_add() {
let json = r##"{
"id": 2001,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "Issue",
"resource_id": 555,
"label": {
"id": 100,
"name": "bug",
"color": "#FF0000",
"description": "Bug label"
},
"action": "add"
}"##;
let event: GitLabLabelEvent =
serde_json::from_str(json).expect("Failed to deserialize label event");
assert_eq!(event.id, 2001);
assert_eq!(event.action, "add");
assert_eq!(event.label.id, 100);
assert_eq!(event.label.name, "bug");
assert_eq!(event.label.color, Some("#FF0000".to_string()));
assert_eq!(event.label.description, Some("Bug label".to_string()));
}
#[test]
fn deserializes_label_event_remove_null_color() {
let json = r#"{
"id": 2002,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "MergeRequest",
"resource_id": 777,
"label": {
"id": 101,
"name": "needs-review",
"color": null,
"description": null
},
"action": "remove"
}"#;
let event: GitLabLabelEvent =
serde_json::from_str(json).expect("Failed to deserialize label remove event");
assert_eq!(event.action, "remove");
assert!(event.label.color.is_none());
assert!(event.label.description.is_none());
}
#[test]
fn deserializes_milestone_event() {
let json = r#"{
"id": 3001,
"user": {
"id": 42,
"username": "developer",
"name": "Dev User"
},
"created_at": "2024-03-15T10:30:00.000Z",
"resource_type": "Issue",
"resource_id": 555,
"milestone": {
"id": 200,
"iid": 5,
"title": "v1.0"
},
"action": "add"
}"#;
let event: GitLabMilestoneEvent =
serde_json::from_str(json).expect("Failed to deserialize milestone event");
assert_eq!(event.id, 3001);
assert_eq!(event.action, "add");
assert_eq!(event.milestone.id, 200);
assert_eq!(event.milestone.iid, 5);
assert_eq!(event.milestone.title, "v1.0");
}
#[test]
fn deserializes_merge_request_ref() {
let json = r#"{
"iid": 42,
"title": "Feature branch",
"web_url": "https://gitlab.example.com/group/project/-/merge_requests/42"
}"#;
let mr_ref: GitLabMergeRequestRef =
serde_json::from_str(json).expect("Failed to deserialize MR ref");
assert_eq!(mr_ref.iid, 42);
assert_eq!(mr_ref.title, Some("Feature branch".to_string()));
assert_eq!(
mr_ref.web_url,
Some("https://gitlab.example.com/group/project/-/merge_requests/42".to_string())
);
}
#[test]
fn deserializes_label_ref() {
let json = r##"{
"id": 100,
"name": "bug",
"color": "#FF0000",
"description": "Bug label"
}"##;
let label_ref: GitLabLabelRef =
serde_json::from_str(json).expect("Failed to deserialize label ref");
assert_eq!(label_ref.id, 100);
assert_eq!(label_ref.name, "bug");
assert_eq!(label_ref.color, Some("#FF0000".to_string()));
}
#[test]
fn deserializes_milestone_ref() {
let json = r#"{
"id": 200,
"iid": 5,
"title": "v1.0"
}"#;
let ms_ref: GitLabMilestoneRef =
serde_json::from_str(json).expect("Failed to deserialize milestone ref");
assert_eq!(ms_ref.id, 200);
assert_eq!(ms_ref.iid, 5);
assert_eq!(ms_ref.title, "v1.0");
}