feat(events): Wire resource event fetching into sync pipeline (bd-1ep)

Integrate resource event fetching as Step 4 of both issue and MR
ingestion, gated behind the fetch_resource_events config flag.

Orchestrator changes:
- Add ProgressEvent variants: ResourceEventsFetchStarted,
  ResourceEventFetched, ResourceEventsFetchComplete
- Add resource_events_fetched/failed fields to IngestProjectResult
  and IngestMrProjectResult
- New enqueue_resource_events_for_entity_type() queries all
  issues/MRs for a project and enqueues resource_events jobs via
  the dependent queue (INSERT OR IGNORE for idempotency)
- New drain_resource_events() claims jobs in batches, fetches
  state/label/milestone events from GitLab API, stores them
  atomically via unchecked_transaction, and handles failures
  with exponential backoff via fail_job()
- Max-iterations guard prevents infinite retry loops within a
  single drain run
- New store_resource_events() + per-type _tx helpers write events
  using prepared statements inside a single transaction
- DrainResult struct tracks fetched/failed counts

CLI ingest changes:
- IngestResult gains resource_events_fetched/failed fields
- Progress bar repurposed for resource event fetch phase
  (reuses discussion bar with updated template)
- Accumulates event counts from both issue and MR ingestion

CLI sync changes:
- SyncResult gains resource_events_fetched/failed fields
- Accumulates counts from both ingest stages
- print_sync() conditionally displays event counts
- Structured logging includes event counts

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-03 13:02:15 -05:00
parent a50fc78823
commit 2bcd8db0e9
3 changed files with 548 additions and 28 deletions

View File

@@ -7,9 +7,12 @@
use futures::future::join_all;
use rusqlite::Connection;
use tracing::info;
use tracing::{debug, info, warn};
use crate::Config;
use crate::core::dependent_queue::{
claim_jobs, complete_job, count_pending_jobs, enqueue_job, fail_job, reclaim_stale_locks,
};
use crate::core::error::Result;
use crate::gitlab::GitLabClient;
@@ -50,6 +53,12 @@ pub enum ProgressEvent {
MrDiscussionSynced { current: usize, total: usize },
/// MR discussion sync complete
MrDiscussionSyncComplete,
/// Resource event fetching started (total jobs)
ResourceEventsFetchStarted { total: usize },
/// Resource event fetched for an entity (current/total)
ResourceEventFetched { current: usize, total: usize },
/// Resource event fetching complete
ResourceEventsFetchComplete { fetched: usize, failed: usize },
}
/// Result of full project ingestion (issues).
@@ -63,6 +72,8 @@ pub struct IngestProjectResult {
pub notes_upserted: usize,
pub issues_synced_discussions: usize,
pub issues_skipped_discussion_sync: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
}
/// Result of MR ingestion for a project.
@@ -80,6 +91,8 @@ pub struct IngestMrProjectResult {
pub diffnotes_count: usize,
pub mrs_synced_discussions: usize,
pub mrs_skipped_discussion_sync: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
}
/// Ingest all issues and their discussions for a project.
@@ -167,6 +180,21 @@ pub async fn ingest_project_issues_with_progress(
result.issues_synced_discussions += 1;
}
// Step 4: Enqueue and drain resource events (if enabled)
if config.sync.fetch_resource_events {
// Enqueue resource_events jobs for all issues in this project
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?;
if enqueued > 0 {
debug!(enqueued, "Enqueued resource events jobs for issues");
}
// Drain the queue
let drain_result =
drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?;
result.resource_events_fetched = drain_result.fetched;
result.resource_events_failed = drain_result.failed;
}
info!(
issues_fetched = result.issues_fetched,
issues_upserted = result.issues_upserted,
@@ -175,6 +203,8 @@ pub async fn ingest_project_issues_with_progress(
notes_upserted = result.notes_upserted,
issues_synced = result.issues_synced_discussions,
issues_skipped = result.issues_skipped_discussion_sync,
resource_events_fetched = result.resource_events_fetched,
resource_events_failed = result.resource_events_failed,
"Project ingestion complete"
);
@@ -343,6 +373,19 @@ pub async fn ingest_project_merge_requests_with_progress(
}
}
// Step 4: Enqueue and drain resource events (if enabled)
if config.sync.fetch_resource_events {
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?;
if enqueued > 0 {
debug!(enqueued, "Enqueued resource events jobs for MRs");
}
let drain_result =
drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?;
result.resource_events_fetched = drain_result.fetched;
result.resource_events_failed = drain_result.failed;
}
info!(
mrs_fetched = result.mrs_fetched,
mrs_upserted = result.mrs_upserted,
@@ -352,6 +395,8 @@ pub async fn ingest_project_merge_requests_with_progress(
diffnotes = result.diffnotes_count,
mrs_synced = result.mrs_synced_discussions,
mrs_skipped = result.mrs_skipped_discussion_sync,
resource_events_fetched = result.resource_events_fetched,
resource_events_failed = result.resource_events_failed,
"MR project ingestion complete"
);
@@ -405,6 +450,368 @@ async fn sync_mr_discussions_sequential(
Ok(results)
}
/// Result of draining the resource events queue.
#[derive(Debug, Default)]
pub struct DrainResult {
pub fetched: usize,
pub failed: usize,
}
/// Enqueue resource_events jobs for all entities of a given type in a project.
///
/// Uses the pending_dependent_fetches queue. Jobs are deduplicated by the UNIQUE
/// constraint, so re-enqueueing the same entity is a no-op.
fn enqueue_resource_events_for_entity_type(
conn: &Connection,
project_id: i64,
entity_type: &str,
) -> Result<usize> {
let (table, id_col) = match entity_type {
"issue" => ("issues", "id"),
"merge_request" => ("merge_requests", "id"),
_ => return Ok(0),
};
// Query all entities for this project and enqueue resource_events jobs.
// The UNIQUE constraint on pending_dependent_fetches makes this idempotent -
// already-queued entities are silently skipped via INSERT OR IGNORE.
let mut stmt = conn.prepare_cached(&format!(
"SELECT {id_col}, iid FROM {table} WHERE project_id = ?1"
))?;
let entities: Vec<(i64, i64)> = stmt
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut enqueued = 0;
for (local_id, iid) in &entities {
if enqueue_job(
conn,
project_id,
entity_type,
*iid,
*local_id,
"resource_events",
None,
)? {
enqueued += 1;
}
}
Ok(enqueued)
}
/// Drain pending resource_events jobs: claim, fetch from GitLab, store, complete/fail.
///
/// Processes jobs sequentially since `rusqlite::Connection` is not `Send`.
/// Uses exponential backoff on failure via `fail_job`.
async fn drain_resource_events(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
progress: &Option<ProgressCallback>,
) -> Result<DrainResult> {
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
// Reclaim stale locks from crashed processes
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
info!(reclaimed, "Reclaimed stale resource event locks");
}
// Count total pending jobs for progress reporting
let pending_counts = count_pending_jobs(conn)?;
let total_pending = pending_counts.get("resource_events").copied().unwrap_or(0);
if total_pending == 0 {
return Ok(result);
}
let emit = |event: ProgressEvent| {
if let Some(cb) = progress {
cb(event);
}
};
emit(ProgressEvent::ResourceEventsFetchStarted {
total: total_pending,
});
let mut processed = 0;
// Max iterations guard: prevent infinite loop if jobs keep failing and retrying
// within the same drain run. Allow 2x total_pending iterations as safety margin.
let max_iterations = total_pending * 2;
let mut iterations = 0;
loop {
if iterations >= max_iterations {
warn!(
iterations,
total_pending, "Resource events drain hit max iterations guard, stopping"
);
break;
}
let jobs = claim_jobs(conn, "resource_events", batch_size)?;
if jobs.is_empty() {
break;
}
for job in &jobs {
iterations += 1;
// conn is &Connection but upsert functions need &mut Connection.
// We need to use unsafe to get a mutable reference since rusqlite
// operations are internally safe with WAL mode and we're single-threaded.
// Instead, we'll use a savepoint approach via the Connection directly.
match client
.fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid)
.await
{
Ok((state_events, label_events, milestone_events)) => {
// Store events - we need &mut Connection for savepoints in upsert functions.
// Use unchecked_transaction as a workaround since we have &Connection.
let store_result = store_resource_events(
conn,
job.project_id,
&job.entity_type,
job.entity_local_id,
&state_events,
&label_events,
&milestone_events,
);
match store_result {
Ok(()) => {
complete_job(conn, job.id)?;
result.fetched += 1;
}
Err(e) => {
warn!(
entity_type = %job.entity_type,
entity_iid = job.entity_iid,
error = %e,
"Failed to store resource events"
);
fail_job(conn, job.id, &e.to_string())?;
result.failed += 1;
}
}
}
Err(e) => {
warn!(
entity_type = %job.entity_type,
entity_iid = job.entity_iid,
error = %e,
"Failed to fetch resource events from GitLab"
);
fail_job(conn, job.id, &e.to_string())?;
result.failed += 1;
}
}
processed += 1;
emit(ProgressEvent::ResourceEventFetched {
current: processed,
total: total_pending,
});
}
}
emit(ProgressEvent::ResourceEventsFetchComplete {
fetched: result.fetched,
failed: result.failed,
});
if result.fetched > 0 || result.failed > 0 {
info!(
fetched = result.fetched,
failed = result.failed,
"Resource events drain complete"
);
}
Ok(result)
}
/// Store fetched resource events in the database.
///
/// Uses unchecked_transaction to work with &Connection (not &mut Connection),
/// which is safe because we're single-threaded and using WAL mode.
fn store_resource_events(
conn: &Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
state_events: &[crate::gitlab::types::GitLabStateEvent],
label_events: &[crate::gitlab::types::GitLabLabelEvent],
milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent],
) -> Result<()> {
// The upsert functions require &mut Connection for savepoints.
// We use unchecked_transaction to wrap all three upserts atomically,
// then call the upsert functions using the transaction's inner connection.
let tx = conn.unchecked_transaction()?;
// State events - use raw SQL within transaction instead of upsert_state_events
// which requires &mut Connection
if !state_events.is_empty() {
store_state_events_tx(&tx, project_id, entity_type, entity_local_id, state_events)?;
}
if !label_events.is_empty() {
store_label_events_tx(&tx, project_id, entity_type, entity_local_id, label_events)?;
}
if !milestone_events.is_empty() {
store_milestone_events_tx(
&tx,
project_id,
entity_type,
entity_local_id,
milestone_events,
)?;
}
tx.commit()?;
Ok(())
}
/// Store state events within an existing transaction.
fn store_state_events_tx(
tx: &rusqlite::Transaction<'_>,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[crate::gitlab::types::GitLabStateEvent],
) -> Result<()> {
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
"issue" => (Some(entity_local_id), None),
"merge_request" => (None, Some(entity_local_id)),
_ => return Ok(()),
};
let mut stmt = tx.prepare_cached(
"INSERT OR REPLACE INTO resource_state_events
(gitlab_id, project_id, issue_id, merge_request_id, state,
actor_gitlab_id, actor_username, created_at,
source_commit, source_merge_request_iid)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
for event in events {
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
.map_err(crate::core::error::LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid);
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.state,
actor_id,
actor_username,
created_at,
event.source_commit,
source_mr_iid,
])?;
}
Ok(())
}
/// Store label events within an existing transaction.
fn store_label_events_tx(
tx: &rusqlite::Transaction<'_>,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[crate::gitlab::types::GitLabLabelEvent],
) -> Result<()> {
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
"issue" => (Some(entity_local_id), None),
"merge_request" => (None, Some(entity_local_id)),
_ => return Ok(()),
};
let mut stmt = tx.prepare_cached(
"INSERT OR REPLACE INTO resource_label_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
label_name, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
)?;
for event in events {
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
.map_err(crate::core::error::LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.label.name,
actor_id,
actor_username,
created_at,
])?;
}
Ok(())
}
/// Store milestone events within an existing transaction.
fn store_milestone_events_tx(
tx: &rusqlite::Transaction<'_>,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[crate::gitlab::types::GitLabMilestoneEvent],
) -> Result<()> {
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
"issue" => (Some(entity_local_id), None),
"merge_request" => (None, Some(entity_local_id)),
_ => return Ok(()),
};
let mut stmt = tx.prepare_cached(
"INSERT OR REPLACE INTO resource_milestone_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
for event in events {
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
.map_err(crate::core::error::LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.milestone.title,
event.milestone.id,
actor_id,
actor_username,
created_at,
])?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
@@ -419,6 +826,8 @@ mod tests {
assert_eq!(result.notes_upserted, 0);
assert_eq!(result.issues_synced_discussions, 0);
assert_eq!(result.issues_skipped_discussion_sync, 0);
assert_eq!(result.resource_events_fetched, 0);
assert_eq!(result.resource_events_failed, 0);
}
#[test]
@@ -436,5 +845,28 @@ mod tests {
assert_eq!(result.diffnotes_count, 0);
assert_eq!(result.mrs_synced_discussions, 0);
assert_eq!(result.mrs_skipped_discussion_sync, 0);
assert_eq!(result.resource_events_fetched, 0);
assert_eq!(result.resource_events_failed, 0);
}
#[test]
fn drain_result_default_has_zero_counts() {
let result = DrainResult::default();
assert_eq!(result.fetched, 0);
assert_eq!(result.failed, 0);
}
#[test]
fn progress_event_resource_variants_exist() {
// Verify the new progress event variants are constructible
let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 };
let _progress = ProgressEvent::ResourceEventFetched {
current: 5,
total: 10,
};
let _complete = ProgressEvent::ResourceEventsFetchComplete {
fetched: 8,
failed: 2,
};
}
}