From 880ad1d3faf5bd4d98d5c4ab2f266d08c2e6cc72 Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Tue, 3 Feb 2026 14:09:35 -0500 Subject: [PATCH] refactor(events): Lift transaction control to callers, eliminate duplicated store functions events_db.rs: - Removed internal savepoints from upsert_state_events, upsert_label_events, and upsert_milestone_events. Each function previously created its own savepoint, making it impossible for callers to wrap all three in a single atomic transaction. - Changed signatures from &mut Connection to &Connection, since savepoints are no longer created internally. This makes the functions compatible with rusqlite::Transaction (which derefs to Connection), allowing callers to pass a transaction directly. orchestrator.rs: - Deleted the three store_*_events_tx() functions (store_state_events_tx, store_label_events_tx, store_milestone_events_tx) which were hand-duplicated copies of the events_db upsert functions, created as a workaround for the &mut Connection requirement. Now that events_db accepts &Connection, store_resource_events() calls the canonical upsert functions directly through the unchecked_transaction. - Replaced the max-iterations guard in drain_resource_events() with a HashSet-based deduplication of job IDs. The old guard used an arbitrary 2x multiplier on total_pending which could either terminate too early (if many retries were legitimate) or too late. The new approach precisely prevents reprocessing the same job within a single drain run, which is the actual invariant we need. Net effect: ~133 lines of duplicated SQL removed, single source of truth for event upsert logic, and callers control transaction scope. Co-Authored-By: Claude Opus 4.5 --- src/core/events_db.rs | 31 ++---- src/ingestion/orchestrator.rs | 186 +++++----------------------------- 2 files changed, 36 insertions(+), 181 deletions(-) diff --git a/src/core/events_db.rs b/src/core/events_db.rs index 0e5ee5b..5409a1d 100644 --- a/src/core/events_db.rs +++ b/src/core/events_db.rs @@ -9,9 +9,9 @@ use crate::gitlab::types::{GitLabLabelEvent, GitLabMilestoneEvent, GitLabStateEv /// Upsert state events for an entity. /// /// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id). -/// Wraps in a savepoint for atomicity per entity. +/// Caller is responsible for wrapping in a transaction if atomicity is needed. pub fn upsert_state_events( - conn: &mut Connection, + conn: &Connection, project_id: i64, entity_type: &str, entity_local_id: i64, @@ -19,9 +19,7 @@ pub fn upsert_state_events( ) -> Result { let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?; - let sp = conn.savepoint()?; - - let mut stmt = sp.prepare_cached( + let mut stmt = conn.prepare_cached( "INSERT OR REPLACE INTO resource_state_events (gitlab_id, project_id, issue_id, merge_request_id, state, actor_gitlab_id, actor_username, created_at, @@ -51,15 +49,13 @@ pub fn upsert_state_events( count += 1; } - drop(stmt); - sp.commit()?; - Ok(count) } /// Upsert label events for an entity. +/// Caller is responsible for wrapping in a transaction if atomicity is needed. pub fn upsert_label_events( - conn: &mut Connection, + conn: &Connection, project_id: i64, entity_type: &str, entity_local_id: i64, @@ -67,9 +63,7 @@ pub fn upsert_label_events( ) -> Result { let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?; - let sp = conn.savepoint()?; - - let mut stmt = sp.prepare_cached( + let mut stmt = conn.prepare_cached( "INSERT OR REPLACE INTO resource_label_events (gitlab_id, project_id, issue_id, merge_request_id, action, label_name, actor_gitlab_id, actor_username, created_at) @@ -96,15 +90,13 @@ pub fn upsert_label_events( count += 1; } - drop(stmt); - sp.commit()?; - Ok(count) } /// Upsert milestone events for an entity. +/// Caller is responsible for wrapping in a transaction if atomicity is needed. pub fn upsert_milestone_events( - conn: &mut Connection, + conn: &Connection, project_id: i64, entity_type: &str, entity_local_id: i64, @@ -112,9 +104,7 @@ pub fn upsert_milestone_events( ) -> Result { let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?; - let sp = conn.savepoint()?; - - let mut stmt = sp.prepare_cached( + let mut stmt = conn.prepare_cached( "INSERT OR REPLACE INTO resource_milestone_events (gitlab_id, project_id, issue_id, merge_request_id, action, milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at) @@ -142,9 +132,6 @@ pub fn upsert_milestone_events( count += 1; } - drop(stmt); - sp.commit()?; - Ok(count) } diff --git a/src/ingestion/orchestrator.rs b/src/ingestion/orchestrator.rs index 859b729..b5ceca4 100644 --- a/src/ingestion/orchestrator.rs +++ b/src/ingestion/orchestrator.rs @@ -540,36 +540,31 @@ async fn drain_resource_events( }); let mut processed = 0; - - // Max iterations guard: prevent infinite loop if jobs keep failing and retrying - // within the same drain run. Allow 2x total_pending iterations as safety margin. - let max_iterations = total_pending * 2; - let mut iterations = 0; + let mut seen_job_ids = std::collections::HashSet::new(); loop { - if iterations >= max_iterations { - warn!( - iterations, - total_pending, "Resource events drain hit max iterations guard, stopping" - ); - break; - } - let jobs = claim_jobs(conn, "resource_events", batch_size)?; if jobs.is_empty() { break; } for job in &jobs { - iterations += 1; + // Guard against re-processing a job that was failed and re-claimed + // within the same drain run (shouldn't happen due to backoff, but + // defensive against clock skew or zero-backoff edge cases). + if !seen_job_ids.insert(job.id) { + warn!( + job_id = job.id, + "Skipping already-processed job in same drain run" + ); + continue; + } match client .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) .await { Ok((state_events, label_events, milestone_events)) => { - // Store events - we need &mut Connection for savepoints in upsert functions. - // Use unchecked_transaction as a workaround since we have &Connection. let store_result = store_resource_events( conn, job.project_id, @@ -635,8 +630,7 @@ async fn drain_resource_events( /// Store fetched resource events in the database. /// -/// Uses unchecked_transaction to work with &Connection (not &mut Connection), -/// which is safe because we're single-threaded and using WAL mode. +/// Wraps all three event types in a single transaction for atomicity. fn store_resource_events( conn: &Connection, project_id: i64, @@ -646,23 +640,30 @@ fn store_resource_events( label_events: &[crate::gitlab::types::GitLabLabelEvent], milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], ) -> Result<()> { - // The upsert functions require &mut Connection for savepoints. - // We use unchecked_transaction to wrap all three upserts atomically, - // then call the upsert functions using the transaction's inner connection. let tx = conn.unchecked_transaction()?; - // State events - use raw SQL within transaction instead of upsert_state_events - // which requires &mut Connection if !state_events.is_empty() { - store_state_events_tx(&tx, project_id, entity_type, entity_local_id, state_events)?; + crate::core::events_db::upsert_state_events( + &tx, + project_id, + entity_type, + entity_local_id, + state_events, + )?; } if !label_events.is_empty() { - store_label_events_tx(&tx, project_id, entity_type, entity_local_id, label_events)?; + crate::core::events_db::upsert_label_events( + &tx, + project_id, + entity_type, + entity_local_id, + label_events, + )?; } if !milestone_events.is_empty() { - store_milestone_events_tx( + crate::core::events_db::upsert_milestone_events( &tx, project_id, entity_type, @@ -675,139 +676,6 @@ fn store_resource_events( Ok(()) } -/// Store state events within an existing transaction. -fn store_state_events_tx( - tx: &rusqlite::Transaction<'_>, - project_id: i64, - entity_type: &str, - entity_local_id: i64, - events: &[crate::gitlab::types::GitLabStateEvent], -) -> Result<()> { - let (issue_id, merge_request_id): (Option, Option) = match entity_type { - "issue" => (Some(entity_local_id), None), - "merge_request" => (None, Some(entity_local_id)), - _ => return Ok(()), - }; - - let mut stmt = tx.prepare_cached( - "INSERT OR REPLACE INTO resource_state_events - (gitlab_id, project_id, issue_id, merge_request_id, state, - actor_gitlab_id, actor_username, created_at, - source_commit, source_merge_request_iid) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", - )?; - - for event in events { - let created_at = crate::core::time::iso_to_ms_strict(&event.created_at) - .map_err(crate::core::error::LoreError::Other)?; - let actor_id = event.user.as_ref().map(|u| u.id); - let actor_username = event.user.as_ref().map(|u| u.username.as_str()); - let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid); - - stmt.execute(rusqlite::params![ - event.id, - project_id, - issue_id, - merge_request_id, - event.state, - actor_id, - actor_username, - created_at, - event.source_commit, - source_mr_iid, - ])?; - } - - Ok(()) -} - -/// Store label events within an existing transaction. -fn store_label_events_tx( - tx: &rusqlite::Transaction<'_>, - project_id: i64, - entity_type: &str, - entity_local_id: i64, - events: &[crate::gitlab::types::GitLabLabelEvent], -) -> Result<()> { - let (issue_id, merge_request_id): (Option, Option) = match entity_type { - "issue" => (Some(entity_local_id), None), - "merge_request" => (None, Some(entity_local_id)), - _ => return Ok(()), - }; - - let mut stmt = tx.prepare_cached( - "INSERT OR REPLACE INTO resource_label_events - (gitlab_id, project_id, issue_id, merge_request_id, action, - label_name, actor_gitlab_id, actor_username, created_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)", - )?; - - for event in events { - let created_at = crate::core::time::iso_to_ms_strict(&event.created_at) - .map_err(crate::core::error::LoreError::Other)?; - let actor_id = event.user.as_ref().map(|u| u.id); - let actor_username = event.user.as_ref().map(|u| u.username.as_str()); - - stmt.execute(rusqlite::params![ - event.id, - project_id, - issue_id, - merge_request_id, - event.action, - event.label.name, - actor_id, - actor_username, - created_at, - ])?; - } - - Ok(()) -} - -/// Store milestone events within an existing transaction. -fn store_milestone_events_tx( - tx: &rusqlite::Transaction<'_>, - project_id: i64, - entity_type: &str, - entity_local_id: i64, - events: &[crate::gitlab::types::GitLabMilestoneEvent], -) -> Result<()> { - let (issue_id, merge_request_id): (Option, Option) = match entity_type { - "issue" => (Some(entity_local_id), None), - "merge_request" => (None, Some(entity_local_id)), - _ => return Ok(()), - }; - - let mut stmt = tx.prepare_cached( - "INSERT OR REPLACE INTO resource_milestone_events - (gitlab_id, project_id, issue_id, merge_request_id, action, - milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at) - VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)", - )?; - - for event in events { - let created_at = crate::core::time::iso_to_ms_strict(&event.created_at) - .map_err(crate::core::error::LoreError::Other)?; - let actor_id = event.user.as_ref().map(|u| u.id); - let actor_username = event.user.as_ref().map(|u| u.username.as_str()); - - stmt.execute(rusqlite::params![ - event.id, - project_id, - issue_id, - merge_request_id, - event.action, - event.milestone.title, - event.milestone.id, - actor_id, - actor_username, - created_at, - ])?; - } - - Ok(()) -} - #[cfg(test)] mod tests { use super::*;