3 Commits

Author SHA1 Message Date
Taylor Eernisse
deafa88af5 perf: Concurrent resource event fetching, remove unnecessary async
client.rs:
- fetch_all_resource_events() now uses tokio::try_join!() to fire all
  three API requests (state, label, milestone events) concurrently
  instead of awaiting each sequentially. For entities with many events,
  this reduces wall-clock time by up to ~3x since the three independent
  HTTP round-trips overlap.

main.rs:
- Removed async from handle_issues() and handle_mrs(). These functions
  perform only synchronous database queries and formatting; they never
  await anything. Removing the async annotation avoids the overhead of
  an unnecessary Future state machine and makes the sync nature of
  these code paths explicit.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 14:09:44 -05:00
Taylor Eernisse
880ad1d3fa refactor(events): Lift transaction control to callers, eliminate duplicated store functions
events_db.rs:
- Removed internal savepoints from upsert_state_events,
  upsert_label_events, and upsert_milestone_events. Each function
  previously created its own savepoint, making it impossible for
  callers to wrap all three in a single atomic transaction.
- Changed signatures from &mut Connection to &Connection, since
  savepoints are no longer created internally. This makes the
  functions compatible with rusqlite::Transaction (which derefs to
  Connection), allowing callers to pass a transaction directly.

orchestrator.rs:
- Deleted the three store_*_events_tx() functions (store_state_events_tx,
  store_label_events_tx, store_milestone_events_tx) which were
  hand-duplicated copies of the events_db upsert functions, created as
  a workaround for the &mut Connection requirement. Now that events_db
  accepts &Connection, store_resource_events() calls the canonical
  upsert functions directly through the unchecked_transaction.
- Replaced the max-iterations guard in drain_resource_events() with a
  HashSet-based deduplication of job IDs. The old guard used an
  arbitrary 2x multiplier on total_pending which could either terminate
  too early (if many retries were legitimate) or too late. The new
  approach precisely prevents reprocessing the same job within a single
  drain run, which is the actual invariant we need.

Net effect: ~133 lines of duplicated SQL removed, single source of
truth for event upsert logic, and callers control transaction scope.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 14:09:35 -05:00
Taylor Eernisse
4c0123426a fix: Content hash now computed after truncation, atomic job claiming
Two bug fixes:

1. extractor.rs: The content hash was computed on the pre-truncation
   content, meaning the hash stored in the document didn't correspond
   to the actual stored (truncated) content. This would cause change
   detection to miss updates when content changed only within the
   truncated portion. Hash is now computed after truncate_hard_cap()
   so it always matches the persisted content.

2. dependent_queue.rs: claim_jobs() had a TOCTOU race between the
   SELECT that found available jobs and the UPDATE that locked them.
   Under concurrent callers, two drain runs could claim the same job.
   Replaced with a single UPDATE ... RETURNING statement that
   atomically selects and locks jobs in one operation.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 14:09:22 -05:00
6 changed files with 74 additions and 229 deletions

View File

@@ -56,8 +56,8 @@ pub fn enqueue_job(
/// Claim a batch of jobs for processing. /// Claim a batch of jobs for processing.
/// ///
/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where /// Atomically selects and locks jobs within a transaction. Only claims jobs
/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`. /// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> { pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> {
if batch_size == 0 { if batch_size == 0 {
return Ok(Vec::new()); return Ok(Vec::new());
@@ -65,19 +65,25 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
let now = now_ms(); let now = now_ms();
// Find available jobs // Use UPDATE ... RETURNING to atomically select and lock in one statement.
let mut select_stmt = conn.prepare_cached( // This eliminates the race between SELECT and UPDATE.
"SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts let mut stmt = conn.prepare_cached(
FROM pending_dependent_fetches "UPDATE pending_dependent_fetches
WHERE job_type = ?1 SET locked_at = ?1
AND locked_at IS NULL WHERE id IN (
AND (next_retry_at IS NULL OR next_retry_at <= ?2) SELECT id FROM pending_dependent_fetches
ORDER BY enqueued_at ASC WHERE job_type = ?2
LIMIT ?3", AND locked_at IS NULL
AND (next_retry_at IS NULL OR next_retry_at <= ?1)
ORDER BY enqueued_at ASC
LIMIT ?3
)
RETURNING id, project_id, entity_type, entity_iid, entity_local_id,
job_type, payload_json, attempts",
)?; )?;
let jobs: Vec<PendingJob> = select_stmt let jobs = stmt
.query_map(rusqlite::params![job_type, now, batch_size as i64], |row| { .query_map(rusqlite::params![now, job_type, batch_size as i64], |row| {
Ok(PendingJob { Ok(PendingJob {
id: row.get(0)?, id: row.get(0)?,
project_id: row.get(1)?, project_id: row.get(1)?,
@@ -91,18 +97,6 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
})? })?
.collect::<std::result::Result<Vec<_>, _>>()?; .collect::<std::result::Result<Vec<_>, _>>()?;
// Lock the claimed jobs
if !jobs.is_empty() {
let ids: Vec<String> = jobs.iter().map(|j| j.id.to_string()).collect();
let placeholders = ids.join(",");
conn.execute(
&format!(
"UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})"
),
rusqlite::params![now],
)?;
}
Ok(jobs) Ok(jobs)
} }

View File

@@ -9,9 +9,9 @@ use crate::gitlab::types::{GitLabLabelEvent, GitLabMilestoneEvent, GitLabStateEv
/// Upsert state events for an entity. /// Upsert state events for an entity.
/// ///
/// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id). /// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id).
/// Wraps in a savepoint for atomicity per entity. /// Caller is responsible for wrapping in a transaction if atomicity is needed.
pub fn upsert_state_events( pub fn upsert_state_events(
conn: &mut Connection, conn: &Connection,
project_id: i64, project_id: i64,
entity_type: &str, entity_type: &str,
entity_local_id: i64, entity_local_id: i64,
@@ -19,9 +19,7 @@ pub fn upsert_state_events(
) -> Result<usize> { ) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?; let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?; let mut stmt = conn.prepare_cached(
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_state_events "INSERT OR REPLACE INTO resource_state_events
(gitlab_id, project_id, issue_id, merge_request_id, state, (gitlab_id, project_id, issue_id, merge_request_id, state,
actor_gitlab_id, actor_username, created_at, actor_gitlab_id, actor_username, created_at,
@@ -51,15 +49,13 @@ pub fn upsert_state_events(
count += 1; count += 1;
} }
drop(stmt);
sp.commit()?;
Ok(count) Ok(count)
} }
/// Upsert label events for an entity. /// Upsert label events for an entity.
/// Caller is responsible for wrapping in a transaction if atomicity is needed.
pub fn upsert_label_events( pub fn upsert_label_events(
conn: &mut Connection, conn: &Connection,
project_id: i64, project_id: i64,
entity_type: &str, entity_type: &str,
entity_local_id: i64, entity_local_id: i64,
@@ -67,9 +63,7 @@ pub fn upsert_label_events(
) -> Result<usize> { ) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?; let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?; let mut stmt = conn.prepare_cached(
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_label_events "INSERT OR REPLACE INTO resource_label_events
(gitlab_id, project_id, issue_id, merge_request_id, action, (gitlab_id, project_id, issue_id, merge_request_id, action,
label_name, actor_gitlab_id, actor_username, created_at) label_name, actor_gitlab_id, actor_username, created_at)
@@ -96,15 +90,13 @@ pub fn upsert_label_events(
count += 1; count += 1;
} }
drop(stmt);
sp.commit()?;
Ok(count) Ok(count)
} }
/// Upsert milestone events for an entity. /// Upsert milestone events for an entity.
/// Caller is responsible for wrapping in a transaction if atomicity is needed.
pub fn upsert_milestone_events( pub fn upsert_milestone_events(
conn: &mut Connection, conn: &Connection,
project_id: i64, project_id: i64,
entity_type: &str, entity_type: &str,
entity_local_id: i64, entity_local_id: i64,
@@ -112,9 +104,7 @@ pub fn upsert_milestone_events(
) -> Result<usize> { ) -> Result<usize> {
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?; let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
let sp = conn.savepoint()?; let mut stmt = conn.prepare_cached(
let mut stmt = sp.prepare_cached(
"INSERT OR REPLACE INTO resource_milestone_events "INSERT OR REPLACE INTO resource_milestone_events
(gitlab_id, project_id, issue_id, merge_request_id, action, (gitlab_id, project_id, issue_id, merge_request_id, action,
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at) milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
@@ -142,9 +132,6 @@ pub fn upsert_milestone_events(
count += 1; count += 1;
} }
drop(stmt);
sp.commit()?;
Ok(count) Ok(count)
} }

View File

@@ -166,12 +166,12 @@ pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result<Option
content.push_str(desc); content.push_str(desc);
} }
let content_hash = compute_content_hash(&content);
let labels_hash = compute_list_hash(&labels); let labels_hash = compute_list_hash(&labels);
let paths_hash = compute_list_hash(&[]); // Issues have no paths let paths_hash = compute_list_hash(&[]); // Issues have no paths
// Apply hard cap truncation for safety // Apply hard cap truncation for safety, then hash the final stored content
let hard_cap = truncate_hard_cap(&content); let hard_cap = truncate_hard_cap(&content);
let content_hash = compute_content_hash(&hard_cap.content);
Ok(Some(DocumentData { Ok(Some(DocumentData {
source_type: SourceType::Issue, source_type: SourceType::Issue,
@@ -281,12 +281,12 @@ pub fn extract_mr_document(conn: &Connection, mr_id: i64) -> Result<Option<Docum
content.push_str(desc); content.push_str(desc);
} }
let content_hash = compute_content_hash(&content);
let labels_hash = compute_list_hash(&labels); let labels_hash = compute_list_hash(&labels);
let paths_hash = compute_list_hash(&[]); let paths_hash = compute_list_hash(&[]);
// Apply hard cap truncation for safety // Apply hard cap truncation for safety, then hash the final stored content
let hard_cap = truncate_hard_cap(&content); let hard_cap = truncate_hard_cap(&content);
let content_hash = compute_content_hash(&hard_cap.content);
Ok(Some(DocumentData { Ok(Some(DocumentData {
source_type: SourceType::MergeRequest, source_type: SourceType::MergeRequest,

View File

@@ -662,7 +662,7 @@ impl GitLabClient {
self.fetch_all_pages(&path).await self.fetch_all_pages(&path).await
} }
/// Fetch all three event types for an entity in one call. /// Fetch all three event types for an entity concurrently.
pub async fn fetch_all_resource_events( pub async fn fetch_all_resource_events(
&self, &self,
gitlab_project_id: i64, gitlab_project_id: i64,
@@ -675,23 +675,19 @@ impl GitLabClient {
)> { )> {
match entity_type { match entity_type {
"issue" => { "issue" => {
let state = self let (state, label, milestone) = tokio::try_join!(
.fetch_issue_state_events(gitlab_project_id, iid) self.fetch_issue_state_events(gitlab_project_id, iid),
.await?; self.fetch_issue_label_events(gitlab_project_id, iid),
let label = self self.fetch_issue_milestone_events(gitlab_project_id, iid),
.fetch_issue_label_events(gitlab_project_id, iid) )?;
.await?;
let milestone = self
.fetch_issue_milestone_events(gitlab_project_id, iid)
.await?;
Ok((state, label, milestone)) Ok((state, label, milestone))
} }
"merge_request" => { "merge_request" => {
let state = self.fetch_mr_state_events(gitlab_project_id, iid).await?; let (state, label, milestone) = tokio::try_join!(
let label = self.fetch_mr_label_events(gitlab_project_id, iid).await?; self.fetch_mr_state_events(gitlab_project_id, iid),
let milestone = self self.fetch_mr_label_events(gitlab_project_id, iid),
.fetch_mr_milestone_events(gitlab_project_id, iid) self.fetch_mr_milestone_events(gitlab_project_id, iid),
.await?; )?;
Ok((state, label, milestone)) Ok((state, label, milestone))
} }
_ => Err(LoreError::Other(format!( _ => Err(LoreError::Other(format!(

View File

@@ -540,36 +540,31 @@ async fn drain_resource_events(
}); });
let mut processed = 0; let mut processed = 0;
let mut seen_job_ids = std::collections::HashSet::new();
// Max iterations guard: prevent infinite loop if jobs keep failing and retrying
// within the same drain run. Allow 2x total_pending iterations as safety margin.
let max_iterations = total_pending * 2;
let mut iterations = 0;
loop { loop {
if iterations >= max_iterations {
warn!(
iterations,
total_pending, "Resource events drain hit max iterations guard, stopping"
);
break;
}
let jobs = claim_jobs(conn, "resource_events", batch_size)?; let jobs = claim_jobs(conn, "resource_events", batch_size)?;
if jobs.is_empty() { if jobs.is_empty() {
break; break;
} }
for job in &jobs { for job in &jobs {
iterations += 1; // Guard against re-processing a job that was failed and re-claimed
// within the same drain run (shouldn't happen due to backoff, but
// defensive against clock skew or zero-backoff edge cases).
if !seen_job_ids.insert(job.id) {
warn!(
job_id = job.id,
"Skipping already-processed job in same drain run"
);
continue;
}
match client match client
.fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid) .fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid)
.await .await
{ {
Ok((state_events, label_events, milestone_events)) => { Ok((state_events, label_events, milestone_events)) => {
// Store events - we need &mut Connection for savepoints in upsert functions.
// Use unchecked_transaction as a workaround since we have &Connection.
let store_result = store_resource_events( let store_result = store_resource_events(
conn, conn,
job.project_id, job.project_id,
@@ -635,8 +630,7 @@ async fn drain_resource_events(
/// Store fetched resource events in the database. /// Store fetched resource events in the database.
/// ///
/// Uses unchecked_transaction to work with &Connection (not &mut Connection), /// Wraps all three event types in a single transaction for atomicity.
/// which is safe because we're single-threaded and using WAL mode.
fn store_resource_events( fn store_resource_events(
conn: &Connection, conn: &Connection,
project_id: i64, project_id: i64,
@@ -646,23 +640,30 @@ fn store_resource_events(
label_events: &[crate::gitlab::types::GitLabLabelEvent], label_events: &[crate::gitlab::types::GitLabLabelEvent],
milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent], milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent],
) -> Result<()> { ) -> Result<()> {
// The upsert functions require &mut Connection for savepoints.
// We use unchecked_transaction to wrap all three upserts atomically,
// then call the upsert functions using the transaction's inner connection.
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
// State events - use raw SQL within transaction instead of upsert_state_events
// which requires &mut Connection
if !state_events.is_empty() { if !state_events.is_empty() {
store_state_events_tx(&tx, project_id, entity_type, entity_local_id, state_events)?; crate::core::events_db::upsert_state_events(
&tx,
project_id,
entity_type,
entity_local_id,
state_events,
)?;
} }
if !label_events.is_empty() { if !label_events.is_empty() {
store_label_events_tx(&tx, project_id, entity_type, entity_local_id, label_events)?; crate::core::events_db::upsert_label_events(
&tx,
project_id,
entity_type,
entity_local_id,
label_events,
)?;
} }
if !milestone_events.is_empty() { if !milestone_events.is_empty() {
store_milestone_events_tx( crate::core::events_db::upsert_milestone_events(
&tx, &tx,
project_id, project_id,
entity_type, entity_type,
@@ -675,139 +676,6 @@ fn store_resource_events(
Ok(()) Ok(())
} }
/// Store state events within an existing transaction.
fn store_state_events_tx(
tx: &rusqlite::Transaction<'_>,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[crate::gitlab::types::GitLabStateEvent],
) -> Result<()> {
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
"issue" => (Some(entity_local_id), None),
"merge_request" => (None, Some(entity_local_id)),
_ => return Ok(()),
};
let mut stmt = tx.prepare_cached(
"INSERT OR REPLACE INTO resource_state_events
(gitlab_id, project_id, issue_id, merge_request_id, state,
actor_gitlab_id, actor_username, created_at,
source_commit, source_merge_request_iid)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
for event in events {
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
.map_err(crate::core::error::LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid);
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.state,
actor_id,
actor_username,
created_at,
event.source_commit,
source_mr_iid,
])?;
}
Ok(())
}
/// Store label events within an existing transaction.
fn store_label_events_tx(
tx: &rusqlite::Transaction<'_>,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[crate::gitlab::types::GitLabLabelEvent],
) -> Result<()> {
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
"issue" => (Some(entity_local_id), None),
"merge_request" => (None, Some(entity_local_id)),
_ => return Ok(()),
};
let mut stmt = tx.prepare_cached(
"INSERT OR REPLACE INTO resource_label_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
label_name, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
)?;
for event in events {
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
.map_err(crate::core::error::LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.label.name,
actor_id,
actor_username,
created_at,
])?;
}
Ok(())
}
/// Store milestone events within an existing transaction.
fn store_milestone_events_tx(
tx: &rusqlite::Transaction<'_>,
project_id: i64,
entity_type: &str,
entity_local_id: i64,
events: &[crate::gitlab::types::GitLabMilestoneEvent],
) -> Result<()> {
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
"issue" => (Some(entity_local_id), None),
"merge_request" => (None, Some(entity_local_id)),
_ => return Ok(()),
};
let mut stmt = tx.prepare_cached(
"INSERT OR REPLACE INTO resource_milestone_events
(gitlab_id, project_id, issue_id, merge_request_id, action,
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
)?;
for event in events {
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
.map_err(crate::core::error::LoreError::Other)?;
let actor_id = event.user.as_ref().map(|u| u.id);
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
stmt.execute(rusqlite::params![
event.id,
project_id,
issue_id,
merge_request_id,
event.action,
event.milestone.title,
event.milestone.id,
actor_id,
actor_username,
created_at,
])?;
}
Ok(())
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;

View File

@@ -72,8 +72,8 @@ async fn main() {
let quiet = cli.quiet; let quiet = cli.quiet;
let result = match cli.command { let result = match cli.command {
Commands::Issues(args) => handle_issues(cli.config.as_deref(), args, robot_mode).await, Commands::Issues(args) => handle_issues(cli.config.as_deref(), args, robot_mode),
Commands::Mrs(args) => handle_mrs(cli.config.as_deref(), args, robot_mode).await, Commands::Mrs(args) => handle_mrs(cli.config.as_deref(), args, robot_mode),
Commands::Search(args) => handle_search(cli.config.as_deref(), args, robot_mode).await, Commands::Search(args) => handle_search(cli.config.as_deref(), args, robot_mode).await,
Commands::Stats(args) => handle_stats(cli.config.as_deref(), args, robot_mode).await, Commands::Stats(args) => handle_stats(cli.config.as_deref(), args, robot_mode).await,
Commands::Embed(args) => handle_embed(cli.config.as_deref(), args, robot_mode).await, Commands::Embed(args) => handle_embed(cli.config.as_deref(), args, robot_mode).await,
@@ -284,7 +284,7 @@ fn handle_error(e: Box<dyn std::error::Error>, robot_mode: bool) -> ! {
// Primary command handlers // Primary command handlers
// ============================================================================ // ============================================================================
async fn handle_issues( fn handle_issues(
config_override: Option<&str>, config_override: Option<&str>,
args: IssuesArgs, args: IssuesArgs,
robot_mode: bool, robot_mode: bool,
@@ -334,7 +334,7 @@ async fn handle_issues(
Ok(()) Ok(())
} }
async fn handle_mrs( fn handle_mrs(
config_override: Option<&str>, config_override: Option<&str>,
args: MrsArgs, args: MrsArgs,
robot_mode: bool, robot_mode: bool,