Files
gitlore/src/ingestion/orchestrator.rs
Taylor Eernisse d3306114eb fix(ingestion): pass ShutdownSignal into issue and MR pagination loops
The orchestrator already accepted a ShutdownSignal but only checked it
between phases (after all issues fetched, before discussions). The inner
loops in ingest_issues() and ingest_merge_requests() consumed entire
paginated streams without checking for cancellation.

On a large initial sync (thousands of issues/MRs), Ctrl+C could be
unresponsive for minutes while the current entity type finished draining.

Now both functions accept &ShutdownSignal and check is_cancelled() at
the top of each iteration, breaking out promptly and committing the
cursor for whatever was already processed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 07:55:36 -05:00

1249 lines
38 KiB
Rust

use futures::future::join_all;
use rusqlite::Connection;
use tracing::{debug, info, instrument, warn};
use crate::Config;
use crate::core::dependent_queue::{
claim_jobs, complete_job_tx, count_claimable_jobs, enqueue_job, fail_job, reclaim_stale_locks,
};
use crate::core::error::Result;
use crate::core::references::{
EntityReference, insert_entity_reference, resolve_issue_local_id, resolve_project_path,
};
use crate::core::shutdown::ShutdownSignal;
use crate::gitlab::GitLabClient;
use super::discussions::ingest_issue_discussions;
use super::issues::{IssueForDiscussionSync, ingest_issues};
use super::merge_requests::{
MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests,
};
use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions};
pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
#[derive(Debug, Clone)]
pub enum ProgressEvent {
IssuesFetchStarted,
IssueFetched { count: usize },
IssuesFetchComplete { total: usize },
DiscussionSyncStarted { total: usize },
DiscussionSynced { current: usize, total: usize },
DiscussionSyncComplete,
MrsFetchStarted,
MrFetched { count: usize },
MrsFetchComplete { total: usize },
MrDiscussionSyncStarted { total: usize },
MrDiscussionSynced { current: usize, total: usize },
MrDiscussionSyncComplete,
ResourceEventsFetchStarted { total: usize },
ResourceEventFetched { current: usize, total: usize },
ResourceEventsFetchComplete { fetched: usize, failed: usize },
ClosesIssuesFetchStarted { total: usize },
ClosesIssueFetched { current: usize, total: usize },
ClosesIssuesFetchComplete { fetched: usize, failed: usize },
}
#[derive(Debug, Default)]
pub struct IngestProjectResult {
pub issues_fetched: usize,
pub issues_upserted: usize,
pub labels_created: usize,
pub discussions_fetched: usize,
pub discussions_upserted: usize,
pub notes_upserted: usize,
pub issues_synced_discussions: usize,
pub issues_skipped_discussion_sync: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
}
#[derive(Debug, Default)]
pub struct IngestMrProjectResult {
pub mrs_fetched: usize,
pub mrs_upserted: usize,
pub labels_created: usize,
pub assignees_linked: usize,
pub reviewers_linked: usize,
pub discussions_fetched: usize,
pub discussions_upserted: usize,
pub notes_upserted: usize,
pub notes_skipped_bad_timestamp: usize,
pub diffnotes_count: usize,
pub mrs_synced_discussions: usize,
pub mrs_skipped_discussion_sync: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
pub closes_issues_fetched: usize,
pub closes_issues_failed: usize,
}
pub async fn ingest_project_issues(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
) -> Result<IngestProjectResult> {
let signal = ShutdownSignal::new();
ingest_project_issues_with_progress(
conn,
client,
config,
project_id,
gitlab_project_id,
None,
&signal,
)
.await
}
#[instrument(
skip(conn, client, config, progress, signal),
fields(project_id, gitlab_project_id, items_processed, items_skipped, errors)
)]
pub async fn ingest_project_issues_with_progress(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
progress: Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<IngestProjectResult> {
let mut result = IngestProjectResult::default();
let emit = |event: ProgressEvent| {
if let Some(ref cb) = progress {
cb(event);
}
};
emit(ProgressEvent::IssuesFetchStarted);
let issue_result =
ingest_issues(conn, client, config, project_id, gitlab_project_id, signal).await?;
result.issues_fetched = issue_result.fetched;
result.issues_upserted = issue_result.upserted;
result.labels_created = issue_result.labels_created;
emit(ProgressEvent::IssuesFetchComplete {
total: result.issues_fetched,
});
let issues_needing_sync = issue_result.issues_needing_discussion_sync;
let total_issues: i64 = conn.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ?",
[project_id],
|row| row.get(0),
)?;
let total_issues = total_issues as usize;
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
if signal.is_cancelled() {
info!("Shutdown requested, returning partial issue results");
return Ok(result);
}
if issues_needing_sync.is_empty() {
info!("No issues need discussion sync");
} else {
info!(
count = issues_needing_sync.len(),
"Starting discussion sync for issues"
);
emit(ProgressEvent::DiscussionSyncStarted {
total: issues_needing_sync.len(),
});
let discussion_results = sync_discussions_sequential(
conn,
client,
config,
gitlab_project_id,
project_id,
&issues_needing_sync,
&progress,
signal,
)
.await?;
emit(ProgressEvent::DiscussionSyncComplete);
for disc_result in discussion_results {
result.discussions_fetched += disc_result.discussions_fetched;
result.discussions_upserted += disc_result.discussions_upserted;
result.notes_upserted += disc_result.notes_upserted;
result.issues_synced_discussions += 1;
}
}
if signal.is_cancelled() {
info!("Shutdown requested, returning partial issue results");
return Ok(result);
}
if config.sync.fetch_resource_events {
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?;
if enqueued > 0 {
debug!(enqueued, "Enqueued resource events jobs for issues");
}
let drain_result = drain_resource_events(
conn,
client,
config,
project_id,
gitlab_project_id,
&progress,
signal,
)
.await?;
result.resource_events_fetched = drain_result.fetched;
result.resource_events_failed = drain_result.failed;
let refs_inserted =
crate::core::references::extract_refs_from_state_events(conn, project_id)?;
if refs_inserted > 0 {
debug!(
refs_inserted,
"Extracted cross-references from state events"
);
}
}
info!(
issues_fetched = result.issues_fetched,
issues_upserted = result.issues_upserted,
labels_created = result.labels_created,
discussions_fetched = result.discussions_fetched,
notes_upserted = result.notes_upserted,
issues_synced = result.issues_synced_discussions,
issues_skipped = result.issues_skipped_discussion_sync,
resource_events_fetched = result.resource_events_fetched,
resource_events_failed = result.resource_events_failed,
"Project ingestion complete"
);
tracing::Span::current().record("items_processed", result.issues_upserted);
tracing::Span::current().record("items_skipped", result.issues_skipped_discussion_sync);
tracing::Span::current().record("errors", result.resource_events_failed);
Ok(result)
}
#[allow(clippy::too_many_arguments)]
async fn sync_discussions_sequential(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
local_project_id: i64,
issues: &[IssueForDiscussionSync],
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<Vec<super::discussions::IngestDiscussionsResult>> {
let batch_size = config.sync.dependent_concurrency as usize;
let total = issues.len();
let mut results = Vec::with_capacity(issues.len());
for chunk in issues.chunks(batch_size) {
if signal.is_cancelled() {
info!("Shutdown requested during discussion sync, returning partial results");
break;
}
for issue in chunk {
let disc_result = ingest_issue_discussions(
conn,
client,
config,
gitlab_project_id,
local_project_id,
std::slice::from_ref(issue),
)
.await?;
results.push(disc_result);
if let Some(cb) = progress {
cb(ProgressEvent::DiscussionSynced {
current: results.len(),
total,
});
}
}
}
Ok(results)
}
pub async fn ingest_project_merge_requests(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
full_sync: bool,
) -> Result<IngestMrProjectResult> {
let signal = ShutdownSignal::new();
ingest_project_merge_requests_with_progress(
conn,
client,
config,
project_id,
gitlab_project_id,
full_sync,
None,
&signal,
)
.await
}
#[instrument(
skip(conn, client, config, progress, signal),
fields(project_id, gitlab_project_id, items_processed, items_skipped, errors)
)]
#[allow(clippy::too_many_arguments)]
pub async fn ingest_project_merge_requests_with_progress(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
full_sync: bool,
progress: Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<IngestMrProjectResult> {
let mut result = IngestMrProjectResult::default();
let emit = |event: ProgressEvent| {
if let Some(ref cb) = progress {
cb(event);
}
};
emit(ProgressEvent::MrsFetchStarted);
let mr_result = ingest_merge_requests(
conn,
client,
config,
project_id,
gitlab_project_id,
full_sync,
signal,
)
.await?;
result.mrs_fetched = mr_result.fetched;
result.mrs_upserted = mr_result.upserted;
result.labels_created = mr_result.labels_created;
result.assignees_linked = mr_result.assignees_linked;
result.reviewers_linked = mr_result.reviewers_linked;
emit(ProgressEvent::MrsFetchComplete {
total: result.mrs_fetched,
});
let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?;
let total_mrs: i64 = conn.query_row(
"SELECT COUNT(*) FROM merge_requests WHERE project_id = ?",
[project_id],
|row| row.get(0),
)?;
let total_mrs = total_mrs as usize;
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results");
return Ok(result);
}
if mrs_needing_sync.is_empty() {
info!("No MRs need discussion sync");
} else {
info!(
count = mrs_needing_sync.len(),
"Starting discussion sync for MRs"
);
emit(ProgressEvent::MrDiscussionSyncStarted {
total: mrs_needing_sync.len(),
});
let discussion_results = sync_mr_discussions_sequential(
conn,
client,
config,
gitlab_project_id,
project_id,
&mrs_needing_sync,
&progress,
signal,
)
.await?;
emit(ProgressEvent::MrDiscussionSyncComplete);
for disc_result in discussion_results {
result.discussions_fetched += disc_result.discussions_fetched;
result.discussions_upserted += disc_result.discussions_upserted;
result.notes_upserted += disc_result.notes_upserted;
result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp;
result.diffnotes_count += disc_result.diffnotes_count;
if disc_result.pagination_succeeded {
result.mrs_synced_discussions += 1;
}
}
}
if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results");
return Ok(result);
}
if config.sync.fetch_resource_events {
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?;
if enqueued > 0 {
debug!(enqueued, "Enqueued resource events jobs for MRs");
}
let drain_result = drain_resource_events(
conn,
client,
config,
project_id,
gitlab_project_id,
&progress,
signal,
)
.await?;
result.resource_events_fetched = drain_result.fetched;
result.resource_events_failed = drain_result.failed;
let refs_inserted =
crate::core::references::extract_refs_from_state_events(conn, project_id)?;
if refs_inserted > 0 {
debug!(
refs_inserted,
"Extracted cross-references from state events"
);
}
}
if signal.is_cancelled() {
info!("Shutdown requested, returning partial MR results");
return Ok(result);
}
let note_refs = crate::core::note_parser::extract_refs_from_system_notes(conn, project_id)?;
if note_refs.inserted > 0 || note_refs.skipped_unresolvable > 0 {
debug!(
inserted = note_refs.inserted,
unresolvable = note_refs.skipped_unresolvable,
parse_failures = note_refs.parse_failures,
"Extracted cross-references from system notes (MRs)"
);
}
{
let enqueued = enqueue_mr_closes_issues_jobs(conn, project_id)?;
if enqueued > 0 {
debug!(enqueued, "Enqueued mr_closes_issues jobs");
}
let closes_result = drain_mr_closes_issues(
conn,
client,
config,
project_id,
gitlab_project_id,
&progress,
signal,
)
.await?;
result.closes_issues_fetched = closes_result.fetched;
result.closes_issues_failed = closes_result.failed;
}
info!(
mrs_fetched = result.mrs_fetched,
mrs_upserted = result.mrs_upserted,
labels_created = result.labels_created,
discussions_fetched = result.discussions_fetched,
notes_upserted = result.notes_upserted,
diffnotes = result.diffnotes_count,
mrs_synced = result.mrs_synced_discussions,
mrs_skipped = result.mrs_skipped_discussion_sync,
resource_events_fetched = result.resource_events_fetched,
resource_events_failed = result.resource_events_failed,
closes_issues_fetched = result.closes_issues_fetched,
closes_issues_failed = result.closes_issues_failed,
"MR project ingestion complete"
);
tracing::Span::current().record("items_processed", result.mrs_upserted);
tracing::Span::current().record("items_skipped", result.mrs_skipped_discussion_sync);
tracing::Span::current().record("errors", result.resource_events_failed);
Ok(result)
}
#[allow(clippy::too_many_arguments)]
async fn sync_mr_discussions_sequential(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
local_project_id: i64,
mrs: &[MrForDiscussionSync],
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<Vec<super::mr_discussions::IngestMrDiscussionsResult>> {
let batch_size = config.sync.dependent_concurrency as usize;
let total = mrs.len();
let mut results = Vec::with_capacity(mrs.len());
let mut processed = 0;
for chunk in mrs.chunks(batch_size) {
if signal.is_cancelled() {
info!("Shutdown requested during MR discussion sync, returning partial results");
break;
}
let prefetch_futures = chunk.iter().map(|mr| {
prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone())
});
let prefetched_batch = join_all(prefetch_futures).await;
for prefetched in prefetched_batch {
let disc_result =
write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?;
results.push(disc_result);
processed += 1;
if let Some(cb) = progress {
cb(ProgressEvent::MrDiscussionSynced {
current: processed,
total,
});
}
}
}
Ok(results)
}
#[derive(Debug, Default)]
pub struct DrainResult {
pub fetched: usize,
pub failed: usize,
pub skipped_not_found: usize,
}
fn enqueue_resource_events_for_entity_type(
conn: &Connection,
project_id: i64,
entity_type: &str,
) -> Result<usize> {
match entity_type {
"issue" => {
conn.execute(
"DELETE FROM pending_dependent_fetches \
WHERE project_id = ?1 AND entity_type = 'issue' AND job_type = 'resource_events' \
AND entity_local_id IN ( \
SELECT id FROM issues \
WHERE project_id = ?1 \
AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \
)",
[project_id],
)?;
}
"merge_request" => {
conn.execute(
"DELETE FROM pending_dependent_fetches \
WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'resource_events' \
AND entity_local_id IN ( \
SELECT id FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at <= COALESCE(resource_events_synced_for_updated_at, 0) \
)",
[project_id],
)?;
}
other => {
warn!(
entity_type = other,
"Unknown entity_type in enqueue_resource_events, skipping stale job cleanup"
);
}
}
let entities: Vec<(i64, i64)> = match entity_type {
"issue" => {
let mut stmt = conn.prepare_cached(
"SELECT id, iid FROM issues \
WHERE project_id = ?1 \
AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)",
)?;
stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?
}
"merge_request" => {
let mut stmt = conn.prepare_cached(
"SELECT id, iid FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at > COALESCE(resource_events_synced_for_updated_at, 0)",
)?;
stmt.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?
}
_ => return Ok(0),
};
let mut enqueued = 0;
for (local_id, iid) in &entities {
if enqueue_job(
conn,
project_id,
entity_type,
*iid,
*local_id,
"resource_events",
None,
)? {
enqueued += 1;
}
}
Ok(enqueued)
}
/// Result of a concurrent HTTP prefetch for resource events.
#[allow(clippy::type_complexity)]
struct PrefetchedResourceEvents {
job_id: i64,
project_id: i64,
entity_type: String,
entity_iid: i64,
entity_local_id: i64,
result: std::result::Result<
(
Vec<crate::gitlab::types::GitLabStateEvent>,
Vec<crate::gitlab::types::GitLabLabelEvent>,
Vec<crate::gitlab::types::GitLabMilestoneEvent>,
),
crate::core::error::LoreError,
>,
}
async fn prefetch_resource_events(
client: &GitLabClient,
gitlab_project_id: i64,
job_id: i64,
project_id: i64,
entity_type: String,
entity_iid: i64,
entity_local_id: i64,
) -> PrefetchedResourceEvents {
let result = client
.fetch_all_resource_events(gitlab_project_id, &entity_type, entity_iid)
.await;
PrefetchedResourceEvents {
job_id,
project_id,
entity_type,
entity_iid,
entity_local_id,
result,
}
}
#[instrument(
skip(conn, client, config, progress, signal),
fields(project_id, gitlab_project_id, items_processed, errors)
)]
async fn drain_resource_events(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<DrainResult> {
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
info!(reclaimed, "Reclaimed stale resource event locks");
}
let claimable_counts = count_claimable_jobs(conn, project_id)?;
let total_pending = claimable_counts
.get("resource_events")
.copied()
.unwrap_or(0);
if total_pending == 0 {
return Ok(result);
}
let emit = |event: ProgressEvent| {
if let Some(cb) = progress {
cb(event);
}
};
emit(ProgressEvent::ResourceEventsFetchStarted {
total: total_pending,
});
let mut processed = 0;
let mut seen_job_ids = std::collections::HashSet::new();
loop {
if signal.is_cancelled() {
info!("Shutdown requested during resource events drain, returning partial results");
break;
}
let jobs = claim_jobs(conn, "resource_events", project_id, batch_size)?;
if jobs.is_empty() {
break;
}
// Phase 1: Concurrent HTTP fetches
let futures: Vec<_> = jobs
.iter()
.filter(|j| seen_job_ids.insert(j.id))
.map(|j| {
prefetch_resource_events(
client,
gitlab_project_id,
j.id,
j.project_id,
j.entity_type.clone(),
j.entity_iid,
j.entity_local_id,
)
})
.collect();
if futures.is_empty() {
warn!("All claimed jobs were already processed, breaking drain loop");
break;
}
let prefetched = join_all(futures).await;
// Phase 2: Serial DB writes
for p in prefetched {
match p.result {
Ok((state_events, label_events, milestone_events)) => {
let store_result = store_resource_events(
conn,
p.project_id,
&p.entity_type,
p.entity_local_id,
&state_events,
&label_events,
&milestone_events,
);
match store_result {
Ok(()) => {
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_resource_event_watermark_tx(
&tx,
&p.entity_type,
p.entity_local_id,
)?;
tx.commit()?;
result.fetched += 1;
}
Err(e) => {
warn!(
entity_type = %p.entity_type,
entity_iid = p.entity_iid,
error = %e,
"Failed to store resource events"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
Err(e) => {
if e.is_permanent_api_error() {
debug!(
entity_type = %p.entity_type,
entity_iid = p.entity_iid,
error = %e,
"Permanent API error for resource events, marking complete"
);
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_resource_event_watermark_tx(&tx, &p.entity_type, p.entity_local_id)?;
tx.commit()?;
result.skipped_not_found += 1;
} else {
warn!(
entity_type = %p.entity_type,
entity_iid = p.entity_iid,
error = %e,
"Failed to fetch resource events from GitLab"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
processed += 1;
emit(ProgressEvent::ResourceEventFetched {
current: processed,
total: total_pending,
});
}
}
emit(ProgressEvent::ResourceEventsFetchComplete {
fetched: result.fetched,
failed: result.failed,
});
if result.fetched > 0 || result.failed > 0 {
info!(
fetched = result.fetched,
failed = result.failed,
"Resource events drain complete"
);
}
tracing::Span::current().record("items_processed", result.fetched);
tracing::Span::current().record("errors", result.failed);
Ok(result)
}
fn store_resource_events(
conn: &Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
state_events: &[crate::gitlab::types::GitLabStateEvent],
label_events: &[crate::gitlab::types::GitLabLabelEvent],
milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent],
) -> Result<()> {
let tx = conn.unchecked_transaction()?;
if !state_events.is_empty() {
crate::core::events_db::upsert_state_events(
&tx,
project_id,
entity_type,
entity_local_id,
state_events,
)?;
}
if !label_events.is_empty() {
crate::core::events_db::upsert_label_events(
&tx,
project_id,
entity_type,
entity_local_id,
label_events,
)?;
}
if !milestone_events.is_empty() {
crate::core::events_db::upsert_milestone_events(
&tx,
project_id,
entity_type,
entity_local_id,
milestone_events,
)?;
}
tx.commit()?;
Ok(())
}
fn update_closes_issues_watermark_tx(
tx: &rusqlite::Transaction<'_>,
mr_local_id: i64,
) -> Result<()> {
tx.execute(
"UPDATE merge_requests SET closes_issues_synced_for_updated_at = updated_at WHERE id = ?",
[mr_local_id],
)?;
Ok(())
}
fn update_resource_event_watermark_tx(
tx: &rusqlite::Transaction<'_>,
entity_type: &str,
entity_local_id: i64,
) -> Result<()> {
match entity_type {
"issue" => {
tx.execute(
"UPDATE issues SET resource_events_synced_for_updated_at = updated_at WHERE id = ?",
[entity_local_id],
)?;
}
"merge_request" => {
tx.execute(
"UPDATE merge_requests SET resource_events_synced_for_updated_at = updated_at WHERE id = ?",
[entity_local_id],
)?;
}
other => {
warn!(
entity_type = other,
"Unknown entity_type in watermark update, skipping"
);
}
}
Ok(())
}
fn enqueue_mr_closes_issues_jobs(conn: &Connection, project_id: i64) -> Result<usize> {
// Remove stale jobs for MRs that haven't changed since their last closes_issues sync
conn.execute(
"DELETE FROM pending_dependent_fetches \
WHERE project_id = ?1 AND entity_type = 'merge_request' AND job_type = 'mr_closes_issues' \
AND entity_local_id IN ( \
SELECT id FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at <= COALESCE(closes_issues_synced_for_updated_at, 0) \
)",
[project_id],
)?;
let mut stmt = conn.prepare_cached(
"SELECT id, iid FROM merge_requests \
WHERE project_id = ?1 \
AND updated_at > COALESCE(closes_issues_synced_for_updated_at, 0)",
)?;
let entities: Vec<(i64, i64)> = stmt
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut enqueued = 0;
for (local_id, iid) in &entities {
if enqueue_job(
conn,
project_id,
"merge_request",
*iid,
*local_id,
"mr_closes_issues",
None,
)? {
enqueued += 1;
}
}
Ok(enqueued)
}
/// Result of a concurrent HTTP prefetch for closes-issues references.
struct PrefetchedClosesIssues {
job_id: i64,
entity_iid: i64,
entity_local_id: i64,
result: std::result::Result<
Vec<crate::gitlab::types::GitLabIssueRef>,
crate::core::error::LoreError,
>,
}
async fn prefetch_closes_issues(
client: &GitLabClient,
gitlab_project_id: i64,
job_id: i64,
entity_iid: i64,
entity_local_id: i64,
) -> PrefetchedClosesIssues {
let result = client
.fetch_mr_closes_issues(gitlab_project_id, entity_iid)
.await;
PrefetchedClosesIssues {
job_id,
entity_iid,
entity_local_id,
result,
}
}
#[instrument(
skip(conn, client, config, progress, signal),
fields(project_id, gitlab_project_id, items_processed, errors)
)]
async fn drain_mr_closes_issues(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
progress: &Option<ProgressCallback>,
signal: &ShutdownSignal,
) -> Result<DrainResult> {
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
info!(reclaimed, "Reclaimed stale mr_closes_issues locks");
}
let claimable_counts = count_claimable_jobs(conn, project_id)?;
let total_pending = claimable_counts
.get("mr_closes_issues")
.copied()
.unwrap_or(0);
if total_pending == 0 {
return Ok(result);
}
let emit = |event: ProgressEvent| {
if let Some(cb) = progress {
cb(event);
}
};
emit(ProgressEvent::ClosesIssuesFetchStarted {
total: total_pending,
});
let mut processed = 0;
let mut seen_job_ids = std::collections::HashSet::new();
loop {
if signal.is_cancelled() {
info!("Shutdown requested during closes_issues drain, returning partial results");
break;
}
let jobs = claim_jobs(conn, "mr_closes_issues", project_id, batch_size)?;
if jobs.is_empty() {
break;
}
// Phase 1: Concurrent HTTP fetches
let futures: Vec<_> = jobs
.iter()
.filter(|j| seen_job_ids.insert(j.id))
.map(|j| {
prefetch_closes_issues(
client,
gitlab_project_id,
j.id,
j.entity_iid,
j.entity_local_id,
)
})
.collect();
if futures.is_empty() {
warn!("All claimed mr_closes_issues jobs were already processed, breaking drain loop");
break;
}
let prefetched = join_all(futures).await;
// Phase 2: Serial DB writes
for p in prefetched {
match p.result {
Ok(closes_issues) => {
let store_result = store_closes_issues_refs(
conn,
project_id,
p.entity_local_id,
&closes_issues,
);
match store_result {
Ok(()) => {
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_closes_issues_watermark_tx(&tx, p.entity_local_id)?;
tx.commit()?;
result.fetched += 1;
}
Err(e) => {
warn!(
entity_iid = p.entity_iid,
error = %e,
"Failed to store closes_issues references"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
Err(e) => {
if e.is_permanent_api_error() {
debug!(
entity_iid = p.entity_iid,
error = %e,
"Permanent API error for closes_issues, marking complete"
);
let tx = conn.unchecked_transaction()?;
complete_job_tx(&tx, p.job_id)?;
update_closes_issues_watermark_tx(&tx, p.entity_local_id)?;
tx.commit()?;
result.skipped_not_found += 1;
} else {
warn!(
entity_iid = p.entity_iid,
error = %e,
"Failed to fetch closes_issues from GitLab"
);
fail_job(conn, p.job_id, &e.to_string())?;
result.failed += 1;
}
}
}
processed += 1;
emit(ProgressEvent::ClosesIssueFetched {
current: processed,
total: total_pending,
});
}
}
emit(ProgressEvent::ClosesIssuesFetchComplete {
fetched: result.fetched,
failed: result.failed,
});
if result.fetched > 0 || result.failed > 0 {
info!(
fetched = result.fetched,
failed = result.failed,
"mr_closes_issues drain complete"
);
}
tracing::Span::current().record("items_processed", result.fetched);
tracing::Span::current().record("errors", result.failed);
Ok(result)
}
fn store_closes_issues_refs(
conn: &Connection,
project_id: i64,
mr_local_id: i64,
closes_issues: &[crate::gitlab::types::GitLabIssueRef],
) -> Result<()> {
conn.execute_batch("SAVEPOINT store_closes_refs")?;
let inner = || -> Result<()> {
for issue_ref in closes_issues {
let target_local_id = resolve_issue_local_id(conn, project_id, issue_ref.iid)?;
let (target_id, target_path, target_iid) = if let Some(local_id) = target_local_id {
(Some(local_id), None, None)
} else {
let path = resolve_project_path(conn, issue_ref.project_id)?;
let fallback =
path.unwrap_or_else(|| format!("gitlab_project:{}", issue_ref.project_id));
(None, Some(fallback), Some(issue_ref.iid))
};
let ref_ = EntityReference {
project_id,
source_entity_type: "merge_request",
source_entity_id: mr_local_id,
target_entity_type: "issue",
target_entity_id: target_id,
target_project_path: target_path.as_deref(),
target_entity_iid: target_iid,
reference_type: "closes",
source_method: "api",
};
insert_entity_reference(conn, &ref_)?;
}
Ok(())
};
match inner() {
Ok(()) => {
conn.execute_batch("RELEASE store_closes_refs")?;
Ok(())
}
Err(e) => {
let _ = conn.execute_batch("ROLLBACK TO store_closes_refs; RELEASE store_closes_refs");
Err(e)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn result_default_has_zero_counts() {
let result = IngestProjectResult::default();
assert_eq!(result.issues_fetched, 0);
assert_eq!(result.issues_upserted, 0);
assert_eq!(result.labels_created, 0);
assert_eq!(result.discussions_fetched, 0);
assert_eq!(result.notes_upserted, 0);
assert_eq!(result.issues_synced_discussions, 0);
assert_eq!(result.issues_skipped_discussion_sync, 0);
assert_eq!(result.resource_events_fetched, 0);
assert_eq!(result.resource_events_failed, 0);
}
#[test]
fn mr_result_default_has_zero_counts() {
let result = IngestMrProjectResult::default();
assert_eq!(result.mrs_fetched, 0);
assert_eq!(result.mrs_upserted, 0);
assert_eq!(result.labels_created, 0);
assert_eq!(result.assignees_linked, 0);
assert_eq!(result.reviewers_linked, 0);
assert_eq!(result.discussions_fetched, 0);
assert_eq!(result.discussions_upserted, 0);
assert_eq!(result.notes_upserted, 0);
assert_eq!(result.notes_skipped_bad_timestamp, 0);
assert_eq!(result.diffnotes_count, 0);
assert_eq!(result.mrs_synced_discussions, 0);
assert_eq!(result.mrs_skipped_discussion_sync, 0);
assert_eq!(result.resource_events_fetched, 0);
assert_eq!(result.resource_events_failed, 0);
}
#[test]
fn drain_result_default_has_zero_counts() {
let result = DrainResult::default();
assert_eq!(result.fetched, 0);
assert_eq!(result.failed, 0);
assert_eq!(result.skipped_not_found, 0);
}
#[test]
fn progress_event_resource_variants_exist() {
let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 };
let _progress = ProgressEvent::ResourceEventFetched {
current: 5,
total: 10,
};
let _complete = ProgressEvent::ResourceEventsFetchComplete {
fetched: 8,
failed: 2,
};
}
}