Files
gitlore/src/ingestion/orchestrator.rs
Taylor Eernisse 880ad1d3fa refactor(events): Lift transaction control to callers, eliminate duplicated store functions
events_db.rs:
- Removed internal savepoints from upsert_state_events,
  upsert_label_events, and upsert_milestone_events. Each function
  previously created its own savepoint, making it impossible for
  callers to wrap all three in a single atomic transaction.
- Changed signatures from &mut Connection to &Connection, since
  savepoints are no longer created internally. This makes the
  functions compatible with rusqlite::Transaction (which derefs to
  Connection), allowing callers to pass a transaction directly.

orchestrator.rs:
- Deleted the three store_*_events_tx() functions (store_state_events_tx,
  store_label_events_tx, store_milestone_events_tx) which were
  hand-duplicated copies of the events_db upsert functions, created as
  a workaround for the &mut Connection requirement. Now that events_db
  accepts &Connection, store_resource_events() calls the canonical
  upsert functions directly through the unchecked_transaction.
- Replaced the max-iterations guard in drain_resource_events() with a
  HashSet-based deduplication of job IDs. The old guard used an
  arbitrary 2x multiplier on total_pending which could either terminate
  too early (if many retries were legitimate) or too late. The new
  approach precisely prevents reprocessing the same job within a single
  drain run, which is the actual invariant we need.

Net effect: ~133 lines of duplicated SQL removed, single source of
truth for event upsert logic, and callers control transaction scope.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 14:09:35 -05:00

737 lines
24 KiB
Rust

//! Ingestion orchestrator: coordinates issue/MR and discussion sync.
//!
//! Implements the canonical pattern:
//! 1. Fetch resources (issues or MRs) with cursor-based sync
//! 2. Identify resources needing discussion sync
//! 3. Execute discussion sync with parallel prefetch (fetch in parallel, write serially)
use futures::future::join_all;
use rusqlite::Connection;
use tracing::{debug, info, warn};
use crate::Config;
use crate::core::dependent_queue::{
claim_jobs, complete_job, count_pending_jobs, enqueue_job, fail_job, reclaim_stale_locks,
};
use crate::core::error::Result;
use crate::gitlab::GitLabClient;
use super::discussions::ingest_issue_discussions;
use super::issues::{IssueForDiscussionSync, ingest_issues};
use super::merge_requests::{
MrForDiscussionSync, get_mrs_needing_discussion_sync, ingest_merge_requests,
};
use super::mr_discussions::{prefetch_mr_discussions, write_prefetched_mr_discussions};
/// Progress callback for ingestion operations.
pub type ProgressCallback = Box<dyn Fn(ProgressEvent) + Send + Sync>;
/// Progress events emitted during ingestion.
#[derive(Debug, Clone)]
pub enum ProgressEvent {
/// Issue fetching started
IssuesFetchStarted,
/// An issue was fetched (current count)
IssueFetched { count: usize },
/// Issue fetching complete
IssuesFetchComplete { total: usize },
/// Discussion sync started (total issues to sync)
DiscussionSyncStarted { total: usize },
/// Discussion synced for an issue (current/total)
DiscussionSynced { current: usize, total: usize },
/// Discussion sync complete
DiscussionSyncComplete,
/// MR fetching started
MrsFetchStarted,
/// An MR was fetched (current count)
MrFetched { count: usize },
/// MR fetching complete
MrsFetchComplete { total: usize },
/// MR discussion sync started (total MRs to sync)
MrDiscussionSyncStarted { total: usize },
/// MR discussion synced (current/total)
MrDiscussionSynced { current: usize, total: usize },
/// MR discussion sync complete
MrDiscussionSyncComplete,
/// Resource event fetching started (total jobs)
ResourceEventsFetchStarted { total: usize },
/// Resource event fetched for an entity (current/total)
ResourceEventFetched { current: usize, total: usize },
/// Resource event fetching complete
ResourceEventsFetchComplete { fetched: usize, failed: usize },
}
/// Result of full project ingestion (issues).
#[derive(Debug, Default)]
pub struct IngestProjectResult {
pub issues_fetched: usize,
pub issues_upserted: usize,
pub labels_created: usize,
pub discussions_fetched: usize,
pub discussions_upserted: usize,
pub notes_upserted: usize,
pub issues_synced_discussions: usize,
pub issues_skipped_discussion_sync: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
}
/// Result of MR ingestion for a project.
#[derive(Debug, Default)]
pub struct IngestMrProjectResult {
pub mrs_fetched: usize,
pub mrs_upserted: usize,
pub labels_created: usize,
pub assignees_linked: usize,
pub reviewers_linked: usize,
pub discussions_fetched: usize,
pub discussions_upserted: usize,
pub notes_upserted: usize,
pub notes_skipped_bad_timestamp: usize,
pub diffnotes_count: usize,
pub mrs_synced_discussions: usize,
pub mrs_skipped_discussion_sync: usize,
pub resource_events_fetched: usize,
pub resource_events_failed: usize,
}
/// Ingest all issues and their discussions for a project.
pub async fn ingest_project_issues(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
) -> Result<IngestProjectResult> {
ingest_project_issues_with_progress(conn, client, config, project_id, gitlab_project_id, None)
.await
}
/// Ingest all issues and their discussions for a project with progress reporting.
pub async fn ingest_project_issues_with_progress(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
progress: Option<ProgressCallback>,
) -> Result<IngestProjectResult> {
let mut result = IngestProjectResult::default();
let emit = |event: ProgressEvent| {
if let Some(ref cb) = progress {
cb(event);
}
};
// Step 1: Ingest issues
let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?;
result.issues_fetched = issue_result.fetched;
result.issues_upserted = issue_result.upserted;
result.labels_created = issue_result.labels_created;
// Step 2: Sync discussions for issues that need it
let issues_needing_sync = issue_result.issues_needing_discussion_sync;
// Query actual total issues for accurate skip count (issues_upserted only counts this run)
let total_issues: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issues WHERE project_id = ?",
[project_id],
|row| row.get(0),
)
.unwrap_or(0);
let total_issues = total_issues as usize;
result.issues_skipped_discussion_sync = total_issues.saturating_sub(issues_needing_sync.len());
// Step 3: Sync discussions for issues that need it
if issues_needing_sync.is_empty() {
info!("No issues need discussion sync");
} else {
info!(
count = issues_needing_sync.len(),
"Starting discussion sync for issues"
);
emit(ProgressEvent::DiscussionSyncStarted {
total: issues_needing_sync.len(),
});
// Execute sequential discussion sync (see function doc for why not concurrent)
let discussion_results = sync_discussions_sequential(
conn,
client,
config,
gitlab_project_id,
project_id,
&issues_needing_sync,
&progress,
)
.await?;
emit(ProgressEvent::DiscussionSyncComplete);
// Aggregate discussion results
for disc_result in discussion_results {
result.discussions_fetched += disc_result.discussions_fetched;
result.discussions_upserted += disc_result.discussions_upserted;
result.notes_upserted += disc_result.notes_upserted;
result.issues_synced_discussions += 1;
}
}
// Step 4: Enqueue and drain resource events (if enabled)
if config.sync.fetch_resource_events {
// Enqueue resource_events jobs for all issues in this project
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "issue")?;
if enqueued > 0 {
debug!(enqueued, "Enqueued resource events jobs for issues");
}
// Drain the queue
let drain_result =
drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?;
result.resource_events_fetched = drain_result.fetched;
result.resource_events_failed = drain_result.failed;
}
info!(
issues_fetched = result.issues_fetched,
issues_upserted = result.issues_upserted,
labels_created = result.labels_created,
discussions_fetched = result.discussions_fetched,
notes_upserted = result.notes_upserted,
issues_synced = result.issues_synced_discussions,
issues_skipped = result.issues_skipped_discussion_sync,
resource_events_fetched = result.resource_events_fetched,
resource_events_failed = result.resource_events_failed,
"Project ingestion complete"
);
Ok(result)
}
/// Sync discussions sequentially for each issue.
///
/// NOTE: Despite the config having `dependent_concurrency`, we process sequentially
/// because rusqlite's `Connection` is not `Send` and cannot be shared across tasks.
/// True concurrency would require connection pooling (r2d2, deadpool, etc.).
/// The batch_size from config is used for progress logging granularity.
async fn sync_discussions_sequential(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
local_project_id: i64,
issues: &[IssueForDiscussionSync],
progress: &Option<ProgressCallback>,
) -> Result<Vec<super::discussions::IngestDiscussionsResult>> {
let batch_size = config.sync.dependent_concurrency as usize;
let total = issues.len();
let mut results = Vec::with_capacity(issues.len());
// Process in batches for progress feedback (actual processing is sequential)
for chunk in issues.chunks(batch_size) {
for issue in chunk {
let disc_result = ingest_issue_discussions(
conn,
client,
config,
gitlab_project_id,
local_project_id,
std::slice::from_ref(issue),
)
.await?;
results.push(disc_result);
// Emit progress
if let Some(cb) = progress {
cb(ProgressEvent::DiscussionSynced {
current: results.len(),
total,
});
}
}
}
Ok(results)
}
/// Ingest all merge requests and their discussions for a project.
pub async fn ingest_project_merge_requests(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
full_sync: bool,
) -> Result<IngestMrProjectResult> {
ingest_project_merge_requests_with_progress(
conn,
client,
config,
project_id,
gitlab_project_id,
full_sync,
None,
)
.await
}
/// Ingest all merge requests and their discussions for a project with progress reporting.
pub async fn ingest_project_merge_requests_with_progress(
conn: &Connection,
client: &GitLabClient,
config: &Config,
project_id: i64,
gitlab_project_id: i64,
full_sync: bool,
progress: Option<ProgressCallback>,
) -> Result<IngestMrProjectResult> {
let mut result = IngestMrProjectResult::default();
let emit = |event: ProgressEvent| {
if let Some(ref cb) = progress {
cb(event);
}
};
// Step 1: Ingest MRs
emit(ProgressEvent::MrsFetchStarted);
let mr_result = ingest_merge_requests(
conn,
client,
config,
project_id,
gitlab_project_id,
full_sync,
)
.await?;
result.mrs_fetched = mr_result.fetched;
result.mrs_upserted = mr_result.upserted;
result.labels_created = mr_result.labels_created;
result.assignees_linked = mr_result.assignees_linked;
result.reviewers_linked = mr_result.reviewers_linked;
emit(ProgressEvent::MrsFetchComplete {
total: result.mrs_fetched,
});
// Step 2: Query DB for MRs needing discussion sync
// CRITICAL: Query AFTER ingestion to avoid memory growth during large ingests
let mrs_needing_sync = get_mrs_needing_discussion_sync(conn, project_id)?;
// Query total MRs for accurate skip count
let total_mrs: i64 = conn
.query_row(
"SELECT COUNT(*) FROM merge_requests WHERE project_id = ?",
[project_id],
|row| row.get(0),
)
.unwrap_or(0);
let total_mrs = total_mrs as usize;
result.mrs_skipped_discussion_sync = total_mrs.saturating_sub(mrs_needing_sync.len());
// Step 3: Sync discussions for MRs that need it
if mrs_needing_sync.is_empty() {
info!("No MRs need discussion sync");
} else {
info!(
count = mrs_needing_sync.len(),
"Starting discussion sync for MRs"
);
emit(ProgressEvent::MrDiscussionSyncStarted {
total: mrs_needing_sync.len(),
});
// Execute sequential MR discussion sync
let discussion_results = sync_mr_discussions_sequential(
conn,
client,
config,
gitlab_project_id,
project_id,
&mrs_needing_sync,
&progress,
)
.await?;
emit(ProgressEvent::MrDiscussionSyncComplete);
// Aggregate discussion results
for disc_result in discussion_results {
result.discussions_fetched += disc_result.discussions_fetched;
result.discussions_upserted += disc_result.discussions_upserted;
result.notes_upserted += disc_result.notes_upserted;
result.notes_skipped_bad_timestamp += disc_result.notes_skipped_bad_timestamp;
result.diffnotes_count += disc_result.diffnotes_count;
if disc_result.pagination_succeeded {
result.mrs_synced_discussions += 1;
}
}
}
// Step 4: Enqueue and drain resource events (if enabled)
if config.sync.fetch_resource_events {
let enqueued = enqueue_resource_events_for_entity_type(conn, project_id, "merge_request")?;
if enqueued > 0 {
debug!(enqueued, "Enqueued resource events jobs for MRs");
}
let drain_result =
drain_resource_events(conn, client, config, gitlab_project_id, &progress).await?;
result.resource_events_fetched = drain_result.fetched;
result.resource_events_failed = drain_result.failed;
}
info!(
mrs_fetched = result.mrs_fetched,
mrs_upserted = result.mrs_upserted,
labels_created = result.labels_created,
discussions_fetched = result.discussions_fetched,
notes_upserted = result.notes_upserted,
diffnotes = result.diffnotes_count,
mrs_synced = result.mrs_synced_discussions,
mrs_skipped = result.mrs_skipped_discussion_sync,
resource_events_fetched = result.resource_events_fetched,
resource_events_failed = result.resource_events_failed,
"MR project ingestion complete"
);
Ok(result)
}
/// Sync discussions for MRs with parallel API prefetching.
///
/// Pattern: Fetch discussions for multiple MRs in parallel, then write serially.
/// This overlaps network I/O while respecting rusqlite's single-connection constraint.
async fn sync_mr_discussions_sequential(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
local_project_id: i64,
mrs: &[MrForDiscussionSync],
progress: &Option<ProgressCallback>,
) -> Result<Vec<super::mr_discussions::IngestMrDiscussionsResult>> {
let batch_size = config.sync.dependent_concurrency as usize;
let total = mrs.len();
let mut results = Vec::with_capacity(mrs.len());
let mut processed = 0;
// Process in batches: parallel API fetch, serial DB write
for chunk in mrs.chunks(batch_size) {
// Step 1: Prefetch discussions for all MRs in this batch in parallel
let prefetch_futures = chunk.iter().map(|mr| {
prefetch_mr_discussions(client, gitlab_project_id, local_project_id, mr.clone())
});
let prefetched_batch = join_all(prefetch_futures).await;
// Step 2: Write each prefetched result serially
for prefetched in prefetched_batch {
let disc_result =
write_prefetched_mr_discussions(conn, config, local_project_id, prefetched)?;
results.push(disc_result);
processed += 1;
// Emit progress
if let Some(cb) = progress {
cb(ProgressEvent::MrDiscussionSynced {
current: processed,
total,
});
}
}
}
Ok(results)
}
/// Result of draining the resource events queue.
#[derive(Debug, Default)]
pub struct DrainResult {
pub fetched: usize,
pub failed: usize,
}
/// Enqueue resource_events jobs for all entities of a given type in a project.
///
/// Uses the pending_dependent_fetches queue. Jobs are deduplicated by the UNIQUE
/// constraint, so re-enqueueing the same entity is a no-op.
fn enqueue_resource_events_for_entity_type(
conn: &Connection,
project_id: i64,
entity_type: &str,
) -> Result<usize> {
let (table, id_col) = match entity_type {
"issue" => ("issues", "id"),
"merge_request" => ("merge_requests", "id"),
_ => return Ok(0),
};
// Query all entities for this project and enqueue resource_events jobs.
// The UNIQUE constraint on pending_dependent_fetches makes this idempotent -
// already-queued entities are silently skipped via INSERT OR IGNORE.
let mut stmt = conn.prepare_cached(&format!(
"SELECT {id_col}, iid FROM {table} WHERE project_id = ?1"
))?;
let entities: Vec<(i64, i64)> = stmt
.query_map([project_id], |row| Ok((row.get(0)?, row.get(1)?)))?
.collect::<std::result::Result<Vec<_>, _>>()?;
let mut enqueued = 0;
for (local_id, iid) in &entities {
if enqueue_job(
conn,
project_id,
entity_type,
*iid,
*local_id,
"resource_events",
None,
)? {
enqueued += 1;
}
}
Ok(enqueued)
}
/// Drain pending resource_events jobs: claim, fetch from GitLab, store, complete/fail.
///
/// Processes jobs sequentially since `rusqlite::Connection` is not `Send`.
/// Uses exponential backoff on failure via `fail_job`.
async fn drain_resource_events(
conn: &Connection,
client: &GitLabClient,
config: &Config,
gitlab_project_id: i64,
progress: &Option<ProgressCallback>,
) -> Result<DrainResult> {
let mut result = DrainResult::default();
let batch_size = config.sync.dependent_concurrency as usize;
// Reclaim stale locks from crashed processes
let reclaimed = reclaim_stale_locks(conn, config.sync.stale_lock_minutes)?;
if reclaimed > 0 {
info!(reclaimed, "Reclaimed stale resource event locks");
}
// Count total pending jobs for progress reporting
let pending_counts = count_pending_jobs(conn)?;
let total_pending = pending_counts.get("resource_events").copied().unwrap_or(0);
if total_pending == 0 {
return Ok(result);
}
let emit = |event: ProgressEvent| {
if let Some(cb) = progress {
cb(event);
}
};
emit(ProgressEvent::ResourceEventsFetchStarted {
total: total_pending,
});
let mut processed = 0;
let mut seen_job_ids = std::collections::HashSet::new();
loop {
let jobs = claim_jobs(conn, "resource_events", batch_size)?;
if jobs.is_empty() {
break;
}
for job in &jobs {
// Guard against re-processing a job that was failed and re-claimed
// within the same drain run (shouldn't happen due to backoff, but
// defensive against clock skew or zero-backoff edge cases).
if !seen_job_ids.insert(job.id) {
warn!(
job_id = job.id,
"Skipping already-processed job in same drain run"
);
continue;
}
match client
.fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid)
.await
{
Ok((state_events, label_events, milestone_events)) => {
let store_result = store_resource_events(
conn,
job.project_id,
&job.entity_type,
job.entity_local_id,
&state_events,
&label_events,
&milestone_events,
);
match store_result {
Ok(()) => {
complete_job(conn, job.id)?;
result.fetched += 1;
}
Err(e) => {
warn!(
entity_type = %job.entity_type,
entity_iid = job.entity_iid,
error = %e,
"Failed to store resource events"
);
fail_job(conn, job.id, &e.to_string())?;
result.failed += 1;
}
}
}
Err(e) => {
warn!(
entity_type = %job.entity_type,
entity_iid = job.entity_iid,
error = %e,
"Failed to fetch resource events from GitLab"
);
fail_job(conn, job.id, &e.to_string())?;
result.failed += 1;
}
}
processed += 1;
emit(ProgressEvent::ResourceEventFetched {
current: processed,
total: total_pending,
});
}
}
emit(ProgressEvent::ResourceEventsFetchComplete {
fetched: result.fetched,
failed: result.failed,
});
if result.fetched > 0 || result.failed > 0 {
info!(
fetched = result.fetched,
failed = result.failed,
"Resource events drain complete"
);
}
Ok(result)
}
/// Store fetched resource events in the database.
///
/// Wraps all three event types in a single transaction for atomicity.
fn store_resource_events(
conn: &Connection,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
state_events: &[crate::gitlab::types::GitLabStateEvent],
label_events: &[crate::gitlab::types::GitLabLabelEvent],
milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent],
) -> Result<()> {
let tx = conn.unchecked_transaction()?;
if !state_events.is_empty() {
crate::core::events_db::upsert_state_events(
&tx,
project_id,
entity_type,
entity_local_id,
state_events,
)?;
}
if !label_events.is_empty() {
crate::core::events_db::upsert_label_events(
&tx,
project_id,
entity_type,
entity_local_id,
label_events,
)?;
}
if !milestone_events.is_empty() {
crate::core::events_db::upsert_milestone_events(
&tx,
project_id,
entity_type,
entity_local_id,
milestone_events,
)?;
}
tx.commit()?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn result_default_has_zero_counts() {
let result = IngestProjectResult::default();
assert_eq!(result.issues_fetched, 0);
assert_eq!(result.issues_upserted, 0);
assert_eq!(result.labels_created, 0);
assert_eq!(result.discussions_fetched, 0);
assert_eq!(result.notes_upserted, 0);
assert_eq!(result.issues_synced_discussions, 0);
assert_eq!(result.issues_skipped_discussion_sync, 0);
assert_eq!(result.resource_events_fetched, 0);
assert_eq!(result.resource_events_failed, 0);
}
#[test]
fn mr_result_default_has_zero_counts() {
let result = IngestMrProjectResult::default();
assert_eq!(result.mrs_fetched, 0);
assert_eq!(result.mrs_upserted, 0);
assert_eq!(result.labels_created, 0);
assert_eq!(result.assignees_linked, 0);
assert_eq!(result.reviewers_linked, 0);
assert_eq!(result.discussions_fetched, 0);
assert_eq!(result.discussions_upserted, 0);
assert_eq!(result.notes_upserted, 0);
assert_eq!(result.notes_skipped_bad_timestamp, 0);
assert_eq!(result.diffnotes_count, 0);
assert_eq!(result.mrs_synced_discussions, 0);
assert_eq!(result.mrs_skipped_discussion_sync, 0);
assert_eq!(result.resource_events_fetched, 0);
assert_eq!(result.resource_events_failed, 0);
}
#[test]
fn drain_result_default_has_zero_counts() {
let result = DrainResult::default();
assert_eq!(result.fetched, 0);
assert_eq!(result.failed, 0);
}
#[test]
fn progress_event_resource_variants_exist() {
// Verify the new progress event variants are constructible
let _start = ProgressEvent::ResourceEventsFetchStarted { total: 10 };
let _progress = ProgressEvent::ResourceEventFetched {
current: 5,
total: 10,
};
let _complete = ProgressEvent::ResourceEventsFetchComplete {
fetched: 8,
failed: 2,
};
}
}