feat(orchestrator): Integrate closes_issues fetching and cross-ref extraction
Extends the MR ingestion pipeline to populate the entity_references table
from multiple sources:
1. Resource state events (extract_refs_from_state_events):
Called after draining the resource_events queue for both issues and MRs.
Extracts "closes" relationships from the structured API data.
2. System notes (extract_refs_from_system_notes):
Called during MR ingestion to parse "mentioned in" and "closed by"
patterns from discussion note bodies.
3. MR closes_issues API (new):
- enqueue_mr_closes_issues_jobs(): Queues jobs for all MRs
- drain_mr_closes_issues(): Fetches closes_issues for each MR
- Records cross-references with source_method='closes_issues_api'
New progress events:
- ClosesIssuesFetchStarted { total }
- ClosesIssueFetched { current, total }
- ClosesIssuesFetchComplete { fetched, failed }
New result fields on IngestMrProjectResult:
- closes_issues_fetched: Count of successful fetches
- closes_issues_failed: Count of failed fetches
The pipeline now comprehensively builds the relationship graph between
issues and MRs, enabling queries like "what will close this issue?"
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,10 +1,3 @@
|
|||||||
//! Ingestion orchestrator: coordinates issue/MR and discussion sync.
|
|
||||||
//!
|
|
||||||
//! Implements the canonical pattern:
|
|
||||||
//! 1. Fetch resources (issues or MRs) with cursor-based sync
|
|
||||||
//! 2. Identify resources needing discussion sync
|
|
||||||
//! 3. Execute discussion sync with parallel prefetch (fetch in parallel, write serially)
|
|
||||||
|
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use tracing::{debug, info, instrument, warn};
|
use tracing::{debug, info, instrument, warn};
|
||||||
@@ -14,6 +7,9 @@ use crate::core::dependent_queue::{
|
|||||||
claim_jobs, complete_job, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks,
|
claim_jobs, complete_job, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks,
|
||||||
};
|
};
|
||||||
use crate::core::error::Result;
|
use crate::core::error::Result;
|
||||||
|
use crate::core::references::{
|
||||||
|
EntityReference, insert_entity_reference, resolve_issue_local_id, resolve_project_path,
|
||||||
|
};
|
||||||
use crate::gitlab::GitLabClient;
|
use crate::gitlab::GitLabClient;
|
||||||
|
|
||||||
use super::discussions::ingest_issue_discussions;
|
use super::discussions::ingest_issue_discussions;
|
||||||
@@ -23,45 +19,30 @@ use super::merge_requests::{
|
|||||||
};
|
};
|
||||||
use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions};
|
use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions};
|
||||||
|
|
||||||
/// Progress callback for ingestion operations.
|
|
||||||
pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
|
pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
|
||||||
|
|
||||||
/// Progress events emitted during ingestion.
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum ProgressEvent {
|
pub enum ProgressEvent {
|
||||||
/// Issue fetching started
|
|
||||||
IssuesFetchStarted,
|
IssuesFetchStarted,
|
||||||
/// An issue was fetched (current count)
|
|
||||||
IssueFetched { count: usize },
|
IssueFetched { count: usize },
|
||||||
/// Issue fetching complete
|
|
||||||
IssuesFetchComplete { total: usize },
|
IssuesFetchComplete { total: usize },
|
||||||
/// Discussion sync started (total issues to sync)
|
|
||||||
DiscussionSyncStarted { total: usize },
|
DiscussionSyncStarted { total: usize },
|
||||||
/// Discussion synced for an issue (current/total)
|
|
||||||
DiscussionSynced { current: usize, total: usize },
|
DiscussionSynced { current: usize, total: usize },
|
||||||
/// Discussion sync complete
|
|
||||||
DiscussionSyncComplete,
|
DiscussionSyncComplete,
|
||||||
/// MR fetching started
|
|
||||||
MrsFetchStarted,
|
MrsFetchStarted,
|
||||||
/// An MR was fetched (current count)
|
|
||||||
MrFetched { count: usize },
|
MrFetched { count: usize },
|
||||||
/// MR fetching complete
|
|
||||||
MrsFetchComplete { total: usize },
|
MrsFetchComplete { total: usize },
|
||||||
/// MR discussion sync started (total MRs to sync)
|
|
||||||
MrDiscussionSyncStarted { total: usize },
|
MrDiscussionSyncStarted { total: usize },
|
||||||
/// MR discussion synced (current/total)
|
|
||||||
MrDiscussionSynced { current: usize, total: usize },
|
MrDiscussionSynced { current: usize, total: usize },
|
||||||
/// MR discussion sync complete
|
|
||||||
MrDiscussionSyncComplete,
|
MrDiscussionSyncComplete,
|
||||||
/// Resource event fetching started (total jobs)
|
|
||||||
ResourceEventsFetchStarted { total: usize },
|
ResourceEventsFetchStarted { total: usize },
|
||||||
/// Resource event fetched for an entity (current/total)
|
|
||||||
ResourceEventFetched { current: usize, total: usize },
|
ResourceEventFetched { current: usize, total: usize },
|
||||||
/// Resource event fetching complete
|
|
||||||
ResourceEventsFetchComplete { fetched: usize, failed: usize },
|
ResourceEventsFetchComplete { fetched: usize, failed: usize },
|
||||||
|
ClosesIssuesFetchStarted { total: usize },
|
||||||
|
ClosesIssueFetched { current: usize, total: usize },
|
||||||
|
ClosesIssuesFetchComplete { fetched: usize, failed: usize },
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result of full project ingestion (issues).
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct IngestProjectResult {
|
pub struct IngestProjectResult {
|
||||||
pub issues_fetched: usize,
|
pub issues_fetched: usize,
|
||||||
@@ -76,7 +57,6 @@ pub struct IngestProjectResult {
|
|||||||
pub resource_events_failed: usize,
|
pub resource_events_failed: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result of MR ingestion for a project.
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct IngestMrProjectResult {
|
pub struct IngestMrProjectResult {
|
||||||
pub mrs_fetched: usize,
|
pub mrs_fetched: usize,
|
||||||
@@ -93,9 +73,10 @@ pub struct IngestMrProjectResult {
|
|||||||
pub mrs_skipped_discussion_sync: usize,
|
pub mrs_skipped_discussion_sync: usize,
|
||||||
pub resource_events_fetched: usize,
|
pub resource_events_fetched: usize,
|
||||||
pub resource_events_failed: usize,
|
pub resource_events_failed: usize,
|
||||||
|
pub closes_issues_fetched: usize,
|
||||||
|
pub closes_issues_failed: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ingest all issues and their discussions for a project.
|
|
||||||
pub async fn ingest_project_issues(
|
pub async fn ingest_project_issues(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
client: &GitLabClient,
|
client: &GitLabClient,
|
||||||
@@ -107,7 +88,6 @@ pub async fn ingest_project_issues(
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ingest all issues and their discussions for a project with progress reporting.
|
|
||||||
#[instrument(
|
#[instrument(
|
||||||
skip(conn, client, config, progress),
|
skip(conn, client, config, progress),
|
||||||
fields(project_id, gitlab_project_id, items_processed, items_skipped, errors)
|
fields(project_id, gitlab_project_id, items_processed, items_skipped, errors)
|
||||||
@@ -127,7 +107,6 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Step 1: Ingest issues
|
|
||||||
emit(ProgressEvent::IssuesFetchStarted);
|
emit(ProgressEvent::IssuesFetchStarted);
|
||||||
let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?;
|
let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?;
|
||||||
|
|
||||||
@@ -139,10 +118,8 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
total: result.issues_fetched,
|
total: result.issues_fetched,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Step 2: Sync discussions for issues that need it
|
|
||||||
let issues_needing_sync = issue_result.issues_needing_discussion_sync;
|
let issues_needing_sync = issue_result.issues_needing_discussion_sync;
|
||||||
|
|
||||||
// Query actual total issues for accurate skip count (issues_upserted only counts this run)
|
|
||||||
let total_issues: i64 = conn
|
let total_issues: i64 = conn
|
||||||
.query_row(
|
.query_row(
|
||||||
"SELECT COUNT(*) FROM issues WHERE project_id = ?",
|
"SELECT COUNT(*) FROM issues WHERE project_id = ?",
|
||||||
@@ -153,7 +130,6 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
let total_issues = total_issues as usize;
|
let total_issues = total_issues as usize;
|
||||||
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
|
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
|
||||||
|
|
||||||
// Step 3: Sync discussions for issues that need it
|
|
||||||
if issues_needing_sync.is_empty() {
|
if issues_needing_sync.is_empty() {
|
||||||
info!("No issues need discussion sync");
|
info!("No issues need discussion sync");
|
||||||
} else {
|
} else {
|
||||||
@@ -166,7 +142,6 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
total: issues_needing_sync.len(),
|
total: issues_needing_sync.len(),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Execute sequential discussion sync (see function doc for why not concurrent)
|
|
||||||
let discussion_results = sync_discussions_sequential(
|
let discussion_results = sync_discussions_sequential(
|
||||||
conn,
|
conn,
|
||||||
client,
|
client,
|
||||||
@@ -180,7 +155,6 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
|
|
||||||
emit(ProgressEvent::DiscussionSyncComplete);
|
emit(ProgressEvent::DiscussionSyncComplete);
|
||||||
|
|
||||||
// Aggregate discussion results
|
|
||||||
for disc_result in discussion_results {
|
for disc_result in discussion_results {
|
||||||
result.discussions_fetched += disc_result.discussions_fetched;
|
result.discussions_fetched += disc_result.discussions_fetched;
|
||||||
result.discussions_upserted += disc_result.discussions_upserted;
|
result.discussions_upserted += disc_result.discussions_upserted;
|
||||||
@@ -189,15 +163,12 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 4: Enqueue and drain resource events (if enabled)
|
|
||||||
if config.sync.fetch_resource_events {
|
if config.sync.fetch_resource_events {
|
||||||
// Enqueue resource_events jobs for all issues in this project
|
|
||||||
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?;
|
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?;
|
||||||
if enqueued > 0 {
|
if enqueued > 0 {
|
||||||
debug!(enqueued, "Enqueued resource events jobs for issues");
|
debug!(enqueued, "Enqueued resource events jobs for issues");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Drain the queue
|
|
||||||
let drain_result = drain_resource_events(
|
let drain_result = drain_resource_events(
|
||||||
conn,
|
conn,
|
||||||
client,
|
client,
|
||||||
@@ -209,6 +180,15 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
.await?;
|
.await?;
|
||||||
result.resource_events_fetched = drain_result.fetched;
|
result.resource_events_fetched = drain_result.fetched;
|
||||||
result.resource_events_failed = drain_result.failed;
|
result.resource_events_failed = drain_result.failed;
|
||||||
|
|
||||||
|
let refs_inserted =
|
||||||
|
crate::core::references::extract_refs_from_state_events(conn, project_id)?;
|
||||||
|
if refs_inserted > 0 {
|
||||||
|
debug!(
|
||||||
|
refs_inserted,
|
||||||
|
"Extracted cross-references from state events"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
@@ -231,12 +211,6 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sync discussions sequentially for each issue.
|
|
||||||
///
|
|
||||||
/// NOTE: Despite the config having `dependent_concurrency`, we process sequentially
|
|
||||||
/// because rusqlite's `Connection` is not `Send` and cannot be shared across tasks.
|
|
||||||
/// True concurrency would require connection pooling (r2d2, deadpool, etc.).
|
|
||||||
/// The batch_size from config is used for progress logging granularity.
|
|
||||||
async fn sync_discussions_sequential(
|
async fn sync_discussions_sequential(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
client: &GitLabClient,
|
client: &GitLabClient,
|
||||||
@@ -251,7 +225,6 @@ async fn sync_discussions_sequential(
|
|||||||
|
|
||||||
let mut results = Vec::with_capacity(issues.len());
|
let mut results = Vec::with_capacity(issues.len());
|
||||||
|
|
||||||
// Process in batches for progress feedback (actual processing is sequential)
|
|
||||||
for chunk in issues.chunks(batch_size) {
|
for chunk in issues.chunks(batch_size) {
|
||||||
for issue in chunk {
|
for issue in chunk {
|
||||||
let disc_result = ingest_issue_discussions(
|
let disc_result = ingest_issue_discussions(
|
||||||
@@ -265,7 +238,6 @@ async fn sync_discussions_sequential(
|
|||||||
.await?;
|
.await?;
|
||||||
results.push(disc_result);
|
results.push(disc_result);
|
||||||
|
|
||||||
// Emit progress
|
|
||||||
if let Some(cb) = progress {
|
if let Some(cb) = progress {
|
||||||
cb(ProgressEvent::DiscussionSynced {
|
cb(ProgressEvent::DiscussionSynced {
|
||||||
current: results.len(),
|
current: results.len(),
|
||||||
@@ -278,7 +250,6 @@ async fn sync_discussions_sequential(
|
|||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ingest all merge requests and their discussions for a project.
|
|
||||||
pub async fn ingest_project_merge_requests(
|
pub async fn ingest_project_merge_requests(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
client: &GitLabClient,
|
client: &GitLabClient,
|
||||||
@@ -299,7 +270,6 @@ pub async fn ingest_project_merge_requests(
|
|||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ingest all merge requests and their discussions for a project with progress reporting.
|
|
||||||
#[instrument(
|
#[instrument(
|
||||||
skip(conn, client, config, progress),
|
skip(conn, client, config, progress),
|
||||||
fields(project_id, gitlab_project_id, items_processed, items_skipped, errors)
|
fields(project_id, gitlab_project_id, items_processed, items_skipped, errors)
|
||||||
@@ -320,7 +290,6 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Step 1: Ingest MRs
|
|
||||||
emit(ProgressEvent::MrsFetchStarted);
|
emit(ProgressEvent::MrsFetchStarted);
|
||||||
let mr_result = ingest_merge_requests(
|
let mr_result = ingest_merge_requests(
|
||||||
conn,
|
conn,
|
||||||
@@ -342,11 +311,8 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
total: result.mrs_fetched,
|
total: result.mrs_fetched,
|
||||||
});
|
});
|
||||||
|
|
||||||
// Step 2: Query DB for MRs needing discussion sync
|
|
||||||
// CRITICAL: Query AFTER ingestion to avoid memory growth during large ingests
|
|
||||||
let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?;
|
let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?;
|
||||||
|
|
||||||
// Query total MRs for accurate skip count
|
|
||||||
let total_mrs: i64 = conn
|
let total_mrs: i64 = conn
|
||||||
.query_row(
|
.query_row(
|
||||||
"SELECT COUNT(*) FROM merge_requests WHERE project_id = ?",
|
"SELECT COUNT(*) FROM merge_requests WHERE project_id = ?",
|
||||||
@@ -357,7 +323,6 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
let total_mrs = total_mrs as usize;
|
let total_mrs = total_mrs as usize;
|
||||||
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
|
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
|
||||||
|
|
||||||
// Step 3: Sync discussions for MRs that need it
|
|
||||||
if mrs_needing_sync.is_empty() {
|
if mrs_needing_sync.is_empty() {
|
||||||
info!("No MRs need discussion sync");
|
info!("No MRs need discussion sync");
|
||||||
} else {
|
} else {
|
||||||
@@ -370,7 +335,6 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
total: mrs_needing_sync.len(),
|
total: mrs_needing_sync.len(),
|
||||||
});
|
});
|
||||||
|
|
||||||
// Execute sequential MR discussion sync
|
|
||||||
let discussion_results = sync_mr_discussions_sequential(
|
let discussion_results = sync_mr_discussions_sequential(
|
||||||
conn,
|
conn,
|
||||||
client,
|
client,
|
||||||
@@ -384,7 +348,6 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
|
|
||||||
emit(ProgressEvent::MrDiscussionSyncComplete);
|
emit(ProgressEvent::MrDiscussionSyncComplete);
|
||||||
|
|
||||||
// Aggregate discussion results
|
|
||||||
for disc_result in discussion_results {
|
for disc_result in discussion_results {
|
||||||
result.discussions_fetched += disc_result.discussions_fetched;
|
result.discussions_fetched += disc_result.discussions_fetched;
|
||||||
result.discussions_upserted += disc_result.discussions_upserted;
|
result.discussions_upserted += disc_result.discussions_upserted;
|
||||||
@@ -397,7 +360,6 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 4: Enqueue and drain resource events (if enabled)
|
|
||||||
if config.sync.fetch_resource_events {
|
if config.sync.fetch_resource_events {
|
||||||
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?;
|
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?;
|
||||||
if enqueued > 0 {
|
if enqueued > 0 {
|
||||||
@@ -415,6 +377,44 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
.await?;
|
.await?;
|
||||||
result.resource_events_fetched = drain_result.fetched;
|
result.resource_events_fetched = drain_result.fetched;
|
||||||
result.resource_events_failed = drain_result.failed;
|
result.resource_events_failed = drain_result.failed;
|
||||||
|
|
||||||
|
let refs_inserted =
|
||||||
|
crate::core::references::extract_refs_from_state_events(conn, project_id)?;
|
||||||
|
if refs_inserted > 0 {
|
||||||
|
debug!(
|
||||||
|
refs_inserted,
|
||||||
|
"Extracted cross-references from state events"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let note_refs = crate::core::note_parser::extract_refs_from_system_notes(conn, project_id)?;
|
||||||
|
if note_refs.inserted > 0 || note_refs.skipped_unresolvable > 0 {
|
||||||
|
debug!(
|
||||||
|
inserted = note_refs.inserted,
|
||||||
|
unresolvable = note_refs.skipped_unresolvable,
|
||||||
|
parse_failures = note_refs.parse_failures,
|
||||||
|
"Extracted cross-references from system notes (MRs)"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
let enqueued = enqueue_mr_closes_issues_jobs(conn, project_id)?;
|
||||||
|
if enqueued > 0 {
|
||||||
|
debug!(enqueued, "Enqueued mr_closes_issues jobs");
|
||||||
|
}
|
||||||
|
|
||||||
|
let closes_result = drain_mr_closes_issues(
|
||||||
|
conn,
|
||||||
|
client,
|
||||||
|
config,
|
||||||
|
project_id,
|
||||||
|
gitlab_project_id,
|
||||||
|
&progress,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
result.closes_issues_fetched = closes_result.fetched;
|
||||||
|
result.closes_issues_failed = closes_result.failed;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
@@ -428,6 +428,8 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
mrs_skipped = result.mrs_skipped_discussion_sync,
|
mrs_skipped = result.mrs_skipped_discussion_sync,
|
||||||
resource_events_fetched = result.resource_events_fetched,
|
resource_events_fetched = result.resource_events_fetched,
|
||||||
resource_events_failed = result.resource_events_failed,
|
resource_events_failed = result.resource_events_failed,
|
||||||
|
closes_issues_fetched = result.closes_issues_fetched,
|
||||||
|
closes_issues_failed = result.closes_issues_failed,
|
||||||
"MR project ingestion complete"
|
"MR project ingestion complete"
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -438,10 +440,6 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sync discussions for MRs with parallel API prefetching.
|
|
||||||
///
|
|
||||||
/// Pattern: Fetch discussions for multiple MRs in parallel, then write serially.
|
|
||||||
/// This overlaps network I/O while respecting rusqlite's single-connection constraint.
|
|
||||||
async fn sync_mr_discussions_sequential(
|
async fn sync_mr_discussions_sequential(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
client: &GitLabClient,
|
client: &GitLabClient,
|
||||||
@@ -457,22 +455,18 @@ async fn sync_mr_discussions_sequential(
|
|||||||
let mut results = Vec::with_capacity(mrs.len());
|
let mut results = Vec::with_capacity(mrs.len());
|
||||||
let mut processed = 0;
|
let mut processed = 0;
|
||||||
|
|
||||||
// Process in batches: parallel API fetch, serial DB write
|
|
||||||
for chunk in mrs.chunks(batch_size) {
|
for chunk in mrs.chunks(batch_size) {
|
||||||
// Step 1: Prefetch discussions for all MRs in this batch in parallel
|
|
||||||
let prefetch_futures = chunk.iter().map(|mr| {
|
let prefetch_futures = chunk.iter().map(|mr| {
|
||||||
prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone())
|
prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone())
|
||||||
});
|
});
|
||||||
let prefetched_batch = join_all(prefetch_futures).await;
|
let prefetched_batch = join_all(prefetch_futures).await;
|
||||||
|
|
||||||
// Step 2: Write each prefetched result serially
|
|
||||||
for prefetched in prefetched_batch {
|
for prefetched in prefetched_batch {
|
||||||
let disc_result =
|
let disc_result =
|
||||||
write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?;
|
write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?;
|
||||||
results.push(disc_result);
|
results.push(disc_result);
|
||||||
processed += 1;
|
processed += 1;
|
||||||
|
|
||||||
// Emit progress
|
|
||||||
if let Some(cb) = progress {
|
if let Some(cb) = progress {
|
||||||
cb(ProgressEvent::MrDiscussionSynced {
|
cb(ProgressEvent::MrDiscussionSynced {
|
||||||
current: processed,
|
current: processed,
|
||||||
@@ -485,7 +479,6 @@ async fn sync_mr_discussions_sequential(
|
|||||||
Ok(results)
|
Ok(results)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Result of draining the resource events queue.
|
|
||||||
#[derive(Debug, Default)]
|
#[derive(Debug, Default)]
|
||||||
pub struct DrainResult {
|
pub struct DrainResult {
|
||||||
pub fetched: usize,
|
pub fetched: usize,
|
||||||
@@ -493,21 +486,11 @@ pub struct DrainResult {
|
|||||||
pub skipped_not_found: usize,
|
pub skipped_not_found: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Enqueue resource_events jobs for all entities of a given type in a project.
|
|
||||||
///
|
|
||||||
/// Uses the pending_dependent_fetches queue. Jobs are deduplicated by the UNIQUE
|
|
||||||
/// constraint, so re-enqueueing the same entity is a no-op.
|
|
||||||
fn enqueue_resource_events_for_entity_type(
|
fn enqueue_resource_events_for_entity_type(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
project_id: i64,
|
project_id: i64,
|
||||||
entity_type: &str,
|
entity_type: &str,
|
||||||
) -> Result<usize> {
|
) -> Result<usize> {
|
||||||
// Clean up obsolete jobs: remove resource_events jobs for entities whose
|
|
||||||
// watermark is already current (updated_at <= resource_events_synced_for_updated_at).
|
|
||||||
// These are leftover from prior runs that failed after watermark-stamping but
|
|
||||||
// before job deletion, or from entities that no longer need syncing.
|
|
||||||
// We intentionally keep jobs for entities that still need syncing (including
|
|
||||||
// in-progress or failed-with-backoff jobs) to preserve retry state.
|
|
||||||
match entity_type {
|
match entity_type {
|
||||||
"issue" => {
|
"issue" => {
|
||||||
conn.execute(
|
conn.execute(
|
||||||
@@ -536,10 +519,6 @@ fn enqueue_resource_events_for_entity_type(
|
|||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enqueue resource_events jobs only for entities whose updated_at exceeds
|
|
||||||
// their last resource event sync watermark.
|
|
||||||
//
|
|
||||||
// Use separate hardcoded queries per entity type to avoid format!-based SQL.
|
|
||||||
let entities: Vec<(i64, i64)> = match entity_type {
|
let entities: Vec<(i64, i64)> = match entity_type {
|
||||||
"issue" => {
|
"issue" => {
|
||||||
let mut stmt = conn.prepare_cached(
|
let mut stmt = conn.prepare_cached(
|
||||||
@@ -580,10 +559,6 @@ fn enqueue_resource_events_for_entity_type(
|
|||||||
Ok(enqueued)
|
Ok(enqueued)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drain pending resource_events jobs: claim, fetch from GitLab, store, complete/fail.
|
|
||||||
///
|
|
||||||
/// Processes jobs sequentially since `rusqlite::Connection` is not `Send`.
|
|
||||||
/// Uses exponential backoff on failure via `fail_job`.
|
|
||||||
#[instrument(
|
#[instrument(
|
||||||
skip(conn, client, config, progress),
|
skip(conn, client, config, progress),
|
||||||
fields(project_id, gitlab_project_id, items_processed, errors)
|
fields(project_id, gitlab_project_id, items_processed, errors)
|
||||||
@@ -599,16 +574,11 @@ async fn drain_resource_events(
|
|||||||
let mut result = DrainResult::default();
|
let mut result = DrainResult::default();
|
||||||
let batch_size = config.sync.dependent_concurrency as usize;
|
let batch_size = config.sync.dependent_concurrency as usize;
|
||||||
|
|
||||||
// Reclaim stale locks from crashed processes
|
|
||||||
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
|
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
|
||||||
if reclaimed > 0 {
|
if reclaimed > 0 {
|
||||||
info!(reclaimed, "Reclaimed stale resource event locks");
|
info!(reclaimed, "Reclaimed stale resource event locks");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Count only claimable jobs (unlocked, past retry backoff) for accurate progress.
|
|
||||||
// Using count_pending_jobs here would inflate the total with locked/backing-off
|
|
||||||
// jobs that can't be claimed in this drain run, causing the progress bar to
|
|
||||||
// never reach 100%.
|
|
||||||
let claimable_counts = count_claimable_jobs(conn, project_id)?;
|
let claimable_counts = count_claimable_jobs(conn, project_id)?;
|
||||||
let total_pending = claimable_counts
|
let total_pending = claimable_counts
|
||||||
.get("resource_events")
|
.get("resource_events")
|
||||||
@@ -638,14 +608,9 @@ async fn drain_resource_events(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Track whether any job in this batch was actually new. If every
|
|
||||||
// claimed job was already seen, break to avoid an infinite loop
|
|
||||||
// (can happen with clock skew or zero-backoff edge cases).
|
|
||||||
let mut any_new_in_batch = false;
|
let mut any_new_in_batch = false;
|
||||||
|
|
||||||
for job in &jobs {
|
for job in &jobs {
|
||||||
// Guard against re-processing a job that was failed and re-claimed
|
|
||||||
// within the same drain run.
|
|
||||||
if !seen_job_ids.insert(job.id) {
|
if !seen_job_ids.insert(job.id) {
|
||||||
warn!(
|
warn!(
|
||||||
job_id = job.id,
|
job_id = job.id,
|
||||||
@@ -693,10 +658,6 @@ async fn drain_resource_events(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
// Only 404 (not found) is truly permanent -- the resource
|
|
||||||
// events endpoint doesn't exist for this entity. Stamp the
|
|
||||||
// watermark so we skip it next run. All other errors
|
|
||||||
// (403, auth, network) get backoff retry.
|
|
||||||
if e.is_permanent_api_error() {
|
if e.is_permanent_api_error() {
|
||||||
debug!(
|
debug!(
|
||||||
entity_type = %job.entity_type,
|
entity_type = %job.entity_type,
|
||||||
@@ -731,7 +692,6 @@ async fn drain_resource_events(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// If every job in this batch was already seen, stop to prevent spinning.
|
|
||||||
if !any_new_in_batch {
|
if !any_new_in_batch {
|
||||||
warn!("All claimed jobs were already processed, breaking drain loop");
|
warn!("All claimed jobs were already processed, breaking drain loop");
|
||||||
break;
|
break;
|
||||||
@@ -757,9 +717,6 @@ async fn drain_resource_events(
|
|||||||
Ok(result)
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store fetched resource events in the database.
|
|
||||||
///
|
|
||||||
/// Wraps all three event types in a single transaction for atomicity.
|
|
||||||
fn store_resource_events(
|
fn store_resource_events(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
project_id: i64,
|
project_id: i64,
|
||||||
@@ -805,10 +762,6 @@ fn store_resource_events(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update the resource event watermark for an entity after successful event fetch.
|
|
||||||
///
|
|
||||||
/// Sets `resource_events_synced_for_updated_at = updated_at` so the entity
|
|
||||||
/// won't be re-enqueued until its `updated_at` advances again.
|
|
||||||
fn update_resource_event_watermark(
|
fn update_resource_event_watermark(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
entity_type: &str,
|
entity_type: &str,
|
||||||
@@ -832,6 +785,209 @@ fn update_resource_event_watermark(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result<usize> {
|
||||||
|
let mut stmt =
|
||||||
|
conn.prepare_cached("SELECT id, iid FROM merge_requests WHERE project_id = ?1")?;
|
||||||
|
let entities: Vec<(i64, i64)> = stmt
|
||||||
|
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
|
||||||
|
.collect::<std::result::Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
let mut enqueued = 0;
|
||||||
|
for (local_id, iid) in &entities {
|
||||||
|
if enqueue_job(
|
||||||
|
conn,
|
||||||
|
project_id,
|
||||||
|
"merge_request",
|
||||||
|
*iid,
|
||||||
|
*local_id,
|
||||||
|
"mr_closes_issues",
|
||||||
|
None,
|
||||||
|
)? {
|
||||||
|
enqueued += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(enqueued)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[instrument(
|
||||||
|
skip(conn, client, config, progress),
|
||||||
|
fields(project_id, gitlab_project_id, items_processed, errors)
|
||||||
|
)]
|
||||||
|
async fn drain_mr_closes_issues(
|
||||||
|
conn: &Connection,
|
||||||
|
client: &GitLabClient,
|
||||||
|
config: &Config,
|
||||||
|
project_id: i64,
|
||||||
|
gitlab_project_id: i64,
|
||||||
|
progress: &Option<ProgressCallback>,
|
||||||
|
) -> Result<DrainResult> {
|
||||||
|
let mut result = DrainResult::default();
|
||||||
|
let batch_size = config.sync.dependent_concurrency as usize;
|
||||||
|
|
||||||
|
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
|
||||||
|
if reclaimed > 0 {
|
||||||
|
info!(reclaimed, "Reclaimed stale mr_closes_issues locks");
|
||||||
|
}
|
||||||
|
|
||||||
|
let claimable_counts = count_claimable_jobs(conn, project_id)?;
|
||||||
|
let total_pending = claimable_counts
|
||||||
|
.get("mr_closes_issues")
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
if total_pending == 0 {
|
||||||
|
return Ok(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
let emit = |event: ProgressEvent| {
|
||||||
|
if let Some(cb) = progress {
|
||||||
|
cb(event);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
emit(ProgressEvent::ClosesIssuesFetchStarted {
|
||||||
|
total: total_pending,
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut processed = 0;
|
||||||
|
let mut seen_job_ids = std::collections::HashSet::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let jobs = claim_jobs(conn, "mr_closes_issues", project_id, batch_size)?;
|
||||||
|
if jobs.is_empty() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut any_new_in_batch = false;
|
||||||
|
|
||||||
|
for job in &jobs {
|
||||||
|
if !seen_job_ids.insert(job.id) {
|
||||||
|
warn!(
|
||||||
|
job_id = job.id,
|
||||||
|
"Skipping already-processed mr_closes_issues job"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
any_new_in_batch = true;
|
||||||
|
|
||||||
|
match client
|
||||||
|
.fetch_mr_closes_issues(gitlab_project_id, job.entity_iid)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(closes_issues) => {
|
||||||
|
let store_result = store_closes_issues_refs(
|
||||||
|
conn,
|
||||||
|
project_id,
|
||||||
|
job.entity_local_id,
|
||||||
|
&closes_issues,
|
||||||
|
);
|
||||||
|
|
||||||
|
match store_result {
|
||||||
|
Ok(()) => {
|
||||||
|
complete_job(conn, job.id)?;
|
||||||
|
result.fetched += 1;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
entity_iid = job.entity_iid,
|
||||||
|
error = %e,
|
||||||
|
"Failed to store closes_issues references"
|
||||||
|
);
|
||||||
|
fail_job(conn, job.id, &e.to_string())?;
|
||||||
|
result.failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if e.is_permanent_api_error() {
|
||||||
|
debug!(
|
||||||
|
entity_iid = job.entity_iid,
|
||||||
|
error = %e,
|
||||||
|
"Permanent API error for closes_issues, marking complete"
|
||||||
|
);
|
||||||
|
complete_job(conn, job.id)?;
|
||||||
|
result.skipped_not_found += 1;
|
||||||
|
} else {
|
||||||
|
warn!(
|
||||||
|
entity_iid = job.entity_iid,
|
||||||
|
error = %e,
|
||||||
|
"Failed to fetch closes_issues from GitLab"
|
||||||
|
);
|
||||||
|
fail_job(conn, job.id, &e.to_string())?;
|
||||||
|
result.failed += 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
processed += 1;
|
||||||
|
emit(ProgressEvent::ClosesIssueFetched {
|
||||||
|
current: processed,
|
||||||
|
total: total_pending,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if !any_new_in_batch {
|
||||||
|
warn!("All claimed mr_closes_issues jobs were already processed, breaking drain loop");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
emit(ProgressEvent::ClosesIssuesFetchComplete {
|
||||||
|
fetched: result.fetched,
|
||||||
|
failed: result.failed,
|
||||||
|
});
|
||||||
|
|
||||||
|
if result.fetched > 0 || result.failed > 0 {
|
||||||
|
info!(
|
||||||
|
fetched = result.fetched,
|
||||||
|
failed = result.failed,
|
||||||
|
"mr_closes_issues drain complete"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::Span::current().record("items_processed", result.fetched);
|
||||||
|
tracing::Span::current().record("errors", result.failed);
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn store_closes_issues_refs(
|
||||||
|
conn: &Connection,
|
||||||
|
project_id: i64,
|
||||||
|
mr_local_id: i64,
|
||||||
|
closes_issues: &[crate::gitlab::types::GitLabIssueRef],
|
||||||
|
) -> Result<()> {
|
||||||
|
for issue_ref in closes_issues {
|
||||||
|
let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?;
|
||||||
|
|
||||||
|
let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id {
|
||||||
|
(Some(local_id), None, None)
|
||||||
|
} else {
|
||||||
|
let path = resolve_project_path(conn, issue_ref.project_id)?;
|
||||||
|
let fallback =
|
||||||
|
path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id));
|
||||||
|
(None, Some(fallback), Some(issue_ref.iid))
|
||||||
|
};
|
||||||
|
|
||||||
|
let ref_ = EntityReference {
|
||||||
|
project_id,
|
||||||
|
source_entity_type: "merge_request",
|
||||||
|
source_entity_id: mr_local_id,
|
||||||
|
target_entity_type: "issue",
|
||||||
|
target_entity_id: target_id,
|
||||||
|
target_project_path: target_path.as_deref(),
|
||||||
|
target_entity_iid: target_iid,
|
||||||
|
reference_type: "closes",
|
||||||
|
source_method: "api",
|
||||||
|
};
|
||||||
|
|
||||||
|
insert_entity_reference(conn, &ref_)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@@ -879,7 +1035,6 @@ mod tests {
|
|||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn progress_event_resource_variants_exist() {
|
fn progress_event_resource_variants_exist() {
|
||||||
// Verify the new progress event variants are constructible
|
|
||||||
let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 };
|
let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 };
|
||||||
let _progress = ProgressEvent::ResourceEventFetched {
|
let _progress = ProgressEvent::ResourceEventFetched {
|
||||||
current: 5,
|
current: 5,
|
||||||
|
|||||||
Reference in New Issue
Block a user