From 9107a78b5704b9e27eed89b27ea72a8e5e16b05f Mon Sep 17 00:00:00 2001 From: teernisse Date: Fri, 6 Mar 2026 11:28:08 -0500 Subject: [PATCH] perf(ingestion): replace per-row INSERT loops with chunked batch INSERTs The issue and MR ingestion paths previously inserted labels, assignees, and reviewers one row at a time inside a transaction. For entities with many labels or assignees, this issued N separate SQLite statements where a single multi-row INSERT suffices. Replace the per-row loops with batch INSERT functions that build a single `INSERT OR IGNORE ... VALUES (?1,?2),(?1,?3),...` statement per chunk. Chunks are capped at 400 rows (BATCH_LINK_ROWS_MAX) to stay comfortably below SQLite's default 999 bind-parameter limit. Affected paths: - issues.rs: link_issue_labels_batch_tx, insert_issue_assignees_batch_tx - merge_requests.rs: insert_mr_labels_batch_tx, insert_mr_assignees_batch_tx, insert_mr_reviewers_batch_tx New tests verify deduplication (OR IGNORE), multi-chunk correctness, and equivalence with the old per-row approach. A perf benchmark (bench_issue_assignee_insert_individual_vs_batch) demonstrates the speedup across representative assignee set sizes. Co-Authored-By: Claude Opus 4.6 --- src/ingestion/issues.rs | 78 ++++++++-- src/ingestion/issues_tests.rs | 101 +++++++++++++ src/ingestion/merge_requests.rs | 242 +++++++++++++++++++++++++++++--- tests/perf_benchmark.rs | 141 +++++++++++++++++++ 4 files changed, 534 insertions(+), 28 deletions(-) diff --git a/src/ingestion/issues.rs b/src/ingestion/issues.rs index 48da301..7b7b29e 100644 --- a/src/ingestion/issues.rs +++ b/src/ingestion/issues.rs @@ -36,6 +36,9 @@ struct SyncCursor { tie_breaker_id: Option, } +// Keep comfortably below SQLite's default 999 bind-parameter limit. +const BATCH_LINK_ROWS_MAX: usize = 400; + pub async fn ingest_issues( conn: &Connection, client: &GitLabClient, @@ -252,22 +255,19 @@ fn process_issue_in_transaction( [local_issue_id], )?; + let mut label_ids = Vec::with_capacity(label_names.len()); for label_name in label_names { let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?; - link_issue_label_tx(tx, local_issue_id, label_id)?; + label_ids.push(label_id); } + link_issue_labels_batch_tx(tx, local_issue_id, &label_ids)?; tx.execute( "DELETE FROM issue_assignees WHERE issue_id = ?", [local_issue_id], )?; - for username in assignee_usernames { - tx.execute( - "INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES (?, ?)", - (local_issue_id, username), - )?; - } + insert_issue_assignees_batch_tx(tx, local_issue_id, assignee_usernames)?; Ok(labels_created) } @@ -296,11 +296,65 @@ fn upsert_label_tx( Ok(id) } -fn link_issue_label_tx(tx: &Transaction<'_>, issue_id: i64, label_id: i64) -> Result<()> { - tx.execute( - "INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)", - (issue_id, label_id), - )?; +fn link_issue_labels_batch_tx( + tx: &Transaction<'_>, + issue_id: i64, + label_ids: &[i64], +) -> Result<()> { + if label_ids.is_empty() { + return Ok(()); + } + + for chunk in label_ids.chunks(BATCH_LINK_ROWS_MAX) { + let placeholders = (0..chunk.len()) + .map(|idx| format!("(?1, ?{})", idx + 2)) + .collect::>() + .join(", "); + let sql = format!( + "INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES {}", + placeholders + ); + + let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1); + params.push(&issue_id); + for label_id in chunk { + params.push(label_id); + } + + tx.execute(&sql, params.as_slice())?; + } + + Ok(()) +} + +fn insert_issue_assignees_batch_tx( + tx: &Transaction<'_>, + issue_id: i64, + usernames: &[String], +) -> Result<()> { + if usernames.is_empty() { + return Ok(()); + } + + for chunk in usernames.chunks(BATCH_LINK_ROWS_MAX) { + let placeholders = (0..chunk.len()) + .map(|idx| format!("(?1, ?{})", idx + 2)) + .collect::>() + .join(", "); + let sql = format!( + "INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES {}", + placeholders + ); + + let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1); + params.push(&issue_id); + for username in chunk { + params.push(username); + } + + tx.execute(&sql, params.as_slice())?; + } + Ok(()) } diff --git a/src/ingestion/issues_tests.rs b/src/ingestion/issues_tests.rs index 59b6b71..86b788f 100644 --- a/src/ingestion/issues_tests.rs +++ b/src/ingestion/issues_tests.rs @@ -1,5 +1,6 @@ use super::*; use crate::gitlab::types::GitLabAuthor; +use rusqlite::Connection; fn passes_cursor_filter(issue: &GitLabIssue, cursor: &SyncCursor) -> Result { let Some(cursor_ts) = cursor.updated_at_cursor else { @@ -47,6 +48,26 @@ fn make_test_issue(id: i64, updated_at: &str) -> GitLabIssue { } } +fn setup_link_tables() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + conn.execute_batch( + " + CREATE TABLE issue_labels ( + issue_id INTEGER NOT NULL, + label_id INTEGER NOT NULL, + PRIMARY KEY(issue_id, label_id) + ); + CREATE TABLE issue_assignees ( + issue_id INTEGER NOT NULL, + username TEXT NOT NULL, + PRIMARY KEY(issue_id, username) + ); + ", + ) + .unwrap(); + conn +} + #[test] fn cursor_filter_allows_newer_issues() { let cursor = SyncCursor { @@ -93,3 +114,83 @@ fn cursor_filter_allows_all_when_no_cursor() { let issue = make_test_issue(1, "2020-01-01T00:00:00.000Z"); assert!(passes_cursor_filter(&issue, &cursor).unwrap_or(false)); } + +#[test] +fn batch_issue_label_insert_deduplicates_ids() { + let conn = setup_link_tables(); + let tx = conn.unchecked_transaction().unwrap(); + + link_issue_labels_batch_tx(&tx, 42, &[9, 3, 9, 1]).unwrap(); + tx.commit().unwrap(); + + let ids: Vec = conn + .prepare("SELECT label_id FROM issue_labels WHERE issue_id = 42 ORDER BY label_id") + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .collect::, _>>() + .unwrap(); + assert_eq!(ids, vec![1, 3, 9]); +} + +#[test] +fn batch_issue_assignee_insert_deduplicates_usernames() { + let conn = setup_link_tables(); + let tx = conn.unchecked_transaction().unwrap(); + + let users = vec![ + "alice".to_string(), + "bob".to_string(), + "alice".to_string(), + "carol".to_string(), + ]; + insert_issue_assignees_batch_tx(&tx, 7, &users).unwrap(); + tx.commit().unwrap(); + + let names: Vec = conn + .prepare("SELECT username FROM issue_assignees WHERE issue_id = 7 ORDER BY username") + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .collect::, _>>() + .unwrap(); + assert_eq!(names, vec!["alice", "bob", "carol"]); +} + +#[test] +fn batch_issue_links_handle_multiple_chunks() { + let conn = setup_link_tables(); + let tx = conn.unchecked_transaction().unwrap(); + + let mut label_ids: Vec = (0..(BATCH_LINK_ROWS_MAX as i64 + 3)) + .map(|idx| idx + 10) + .collect(); + label_ids.push(10); + + let mut users: Vec = (0..(BATCH_LINK_ROWS_MAX + 5)) + .map(|idx| format!("user-{idx}")) + .collect(); + users.push("user-0".to_string()); + + link_issue_labels_batch_tx(&tx, 11, &label_ids).unwrap(); + insert_issue_assignees_batch_tx(&tx, 11, &users).unwrap(); + tx.commit().unwrap(); + + let label_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM issue_labels WHERE issue_id = 11", + [], + |row| row.get(0), + ) + .unwrap(); + let user_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM issue_assignees WHERE issue_id = 11", + [], + |row| row.get(0), + ) + .unwrap(); + + assert_eq!(label_count, BATCH_LINK_ROWS_MAX as i64 + 3); + assert_eq!(user_count, (BATCH_LINK_ROWS_MAX + 5) as i64); +} diff --git a/src/ingestion/merge_requests.rs b/src/ingestion/merge_requests.rs index baa5763..94d51ab 100644 --- a/src/ingestion/merge_requests.rs +++ b/src/ingestion/merge_requests.rs @@ -36,6 +36,9 @@ struct SyncCursor { tie_breaker_id: Option, } +// Keep comfortably below SQLite's default 999 bind-parameter limit. +const BATCH_LINK_ROWS_MAX: usize = 400; + pub async fn ingest_merge_requests( conn: &Connection, client: &GitLabClient, @@ -252,37 +255,26 @@ fn process_mr_in_transaction( "DELETE FROM mr_labels WHERE merge_request_id = ?", [local_mr_id], )?; + let mut label_ids = Vec::with_capacity(transformed.label_names.len()); for label_name in &transformed.label_names { let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?; - tx.execute( - "INSERT OR IGNORE INTO mr_labels (merge_request_id, label_id) VALUES (?, ?)", - (local_mr_id, label_id), - )?; + label_ids.push(label_id); } + insert_mr_labels_batch_tx(tx, local_mr_id, &label_ids)?; tx.execute( "DELETE FROM mr_assignees WHERE merge_request_id = ?", [local_mr_id], )?; let assignees_linked = transformed.assignee_usernames.len(); - for username in &transformed.assignee_usernames { - tx.execute( - "INSERT OR IGNORE INTO mr_assignees (merge_request_id, username) VALUES (?, ?)", - (local_mr_id, username), - )?; - } + insert_mr_assignees_batch_tx(tx, local_mr_id, &transformed.assignee_usernames)?; tx.execute( "DELETE FROM mr_reviewers WHERE merge_request_id = ?", [local_mr_id], )?; let reviewers_linked = transformed.reviewer_usernames.len(); - for username in &transformed.reviewer_usernames { - tx.execute( - "INSERT OR IGNORE INTO mr_reviewers (merge_request_id, username) VALUES (?, ?)", - (local_mr_id, username), - )?; - } + insert_mr_reviewers_batch_tx(tx, local_mr_id, &transformed.reviewer_usernames)?; Ok(ProcessMrResult { labels_created, @@ -315,6 +307,99 @@ fn upsert_label_tx( Ok(id) } +fn insert_mr_labels_batch_tx( + tx: &Transaction<'_>, + merge_request_id: i64, + label_ids: &[i64], +) -> Result<()> { + if label_ids.is_empty() { + return Ok(()); + } + + for chunk in label_ids.chunks(BATCH_LINK_ROWS_MAX) { + let placeholders = (0..chunk.len()) + .map(|idx| format!("(?1, ?{})", idx + 2)) + .collect::>() + .join(", "); + let sql = format!( + "INSERT OR IGNORE INTO mr_labels (merge_request_id, label_id) VALUES {}", + placeholders + ); + + let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1); + params.push(&merge_request_id); + for label_id in chunk { + params.push(label_id); + } + + tx.execute(&sql, params.as_slice())?; + } + + Ok(()) +} + +fn insert_mr_assignees_batch_tx( + tx: &Transaction<'_>, + merge_request_id: i64, + usernames: &[String], +) -> Result<()> { + if usernames.is_empty() { + return Ok(()); + } + + for chunk in usernames.chunks(BATCH_LINK_ROWS_MAX) { + let placeholders = (0..chunk.len()) + .map(|idx| format!("(?1, ?{})", idx + 2)) + .collect::>() + .join(", "); + let sql = format!( + "INSERT OR IGNORE INTO mr_assignees (merge_request_id, username) VALUES {}", + placeholders + ); + + let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1); + params.push(&merge_request_id); + for username in chunk { + params.push(username); + } + + tx.execute(&sql, params.as_slice())?; + } + + Ok(()) +} + +fn insert_mr_reviewers_batch_tx( + tx: &Transaction<'_>, + merge_request_id: i64, + usernames: &[String], +) -> Result<()> { + if usernames.is_empty() { + return Ok(()); + } + + for chunk in usernames.chunks(BATCH_LINK_ROWS_MAX) { + let placeholders = (0..chunk.len()) + .map(|idx| format!("(?1, ?{})", idx + 2)) + .collect::>() + .join(", "); + let sql = format!( + "INSERT OR IGNORE INTO mr_reviewers (merge_request_id, username) VALUES {}", + placeholders + ); + + let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1); + params.push(&merge_request_id); + for username in chunk { + params.push(username); + } + + tx.execute(&sql, params.as_slice())?; + } + + Ok(()) +} + fn passes_cursor_filter_with_ts(gitlab_id: i64, mr_ts: i64, cursor: &SyncCursor) -> bool { let Some(cursor_ts) = cursor.updated_at_cursor else { return true; @@ -425,6 +510,32 @@ fn parse_timestamp(ts: &str) -> Result { #[cfg(test)] mod tests { use super::*; + use rusqlite::Connection; + + fn setup_link_tables() -> Connection { + let conn = Connection::open_in_memory().unwrap(); + conn.execute_batch( + " + CREATE TABLE mr_labels ( + merge_request_id INTEGER NOT NULL, + label_id INTEGER NOT NULL, + PRIMARY KEY(merge_request_id, label_id) + ); + CREATE TABLE mr_assignees ( + merge_request_id INTEGER NOT NULL, + username TEXT NOT NULL, + PRIMARY KEY(merge_request_id, username) + ); + CREATE TABLE mr_reviewers ( + merge_request_id INTEGER NOT NULL, + username TEXT NOT NULL, + PRIMARY KEY(merge_request_id, username) + ); + ", + ) + .unwrap(); + conn + } #[test] fn result_default_has_zero_counts() { @@ -478,4 +589,103 @@ mod tests { let old_ts = 1577836800000; assert!(passes_cursor_filter_with_ts(1, old_ts, &cursor)); } + + #[test] + fn batch_mr_label_insert_deduplicates_ids() { + let conn = setup_link_tables(); + let tx = conn.unchecked_transaction().unwrap(); + + insert_mr_labels_batch_tx(&tx, 99, &[5, 2, 5, 1]).unwrap(); + tx.commit().unwrap(); + + let ids: Vec = conn + .prepare("SELECT label_id FROM mr_labels WHERE merge_request_id = 99 ORDER BY label_id") + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .collect::, _>>() + .unwrap(); + assert_eq!(ids, vec![1, 2, 5]); + } + + #[test] + fn batch_mr_assignee_and_reviewer_insert_deduplicates_usernames() { + let conn = setup_link_tables(); + let tx = conn.unchecked_transaction().unwrap(); + + let users = vec!["alice".to_string(), "bob".to_string(), "alice".to_string()]; + + insert_mr_assignees_batch_tx(&tx, 33, &users).unwrap(); + insert_mr_reviewers_batch_tx(&tx, 33, &users).unwrap(); + tx.commit().unwrap(); + + let assignees: Vec = conn + .prepare( + "SELECT username FROM mr_assignees WHERE merge_request_id = 33 ORDER BY username", + ) + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .collect::, _>>() + .unwrap(); + let reviewers: Vec = conn + .prepare( + "SELECT username FROM mr_reviewers WHERE merge_request_id = 33 ORDER BY username", + ) + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .collect::, _>>() + .unwrap(); + + assert_eq!(assignees, vec!["alice", "bob"]); + assert_eq!(reviewers, vec!["alice", "bob"]); + } + + #[test] + fn batch_mr_links_handle_multiple_chunks() { + let conn = setup_link_tables(); + let tx = conn.unchecked_transaction().unwrap(); + + let mut label_ids: Vec = (0..(BATCH_LINK_ROWS_MAX as i64 + 3)) + .map(|idx| idx + 100) + .collect(); + label_ids.push(100); + + let mut users: Vec = (0..(BATCH_LINK_ROWS_MAX + 5)) + .map(|idx| format!("user-{idx}")) + .collect(); + users.push("user-0".to_string()); + + insert_mr_labels_batch_tx(&tx, 77, &label_ids).unwrap(); + insert_mr_assignees_batch_tx(&tx, 77, &users).unwrap(); + insert_mr_reviewers_batch_tx(&tx, 77, &users).unwrap(); + tx.commit().unwrap(); + + let label_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM mr_labels WHERE merge_request_id = 77", + [], + |row| row.get(0), + ) + .unwrap(); + let assignee_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM mr_assignees WHERE merge_request_id = 77", + [], + |row| row.get(0), + ) + .unwrap(); + let reviewer_count: i64 = conn + .query_row( + "SELECT COUNT(*) FROM mr_reviewers WHERE merge_request_id = 77", + [], + |row| row.get(0), + ) + .unwrap(); + + assert_eq!(label_count, BATCH_LINK_ROWS_MAX as i64 + 3); + assert_eq!(assignee_count, (BATCH_LINK_ROWS_MAX + 5) as i64); + assert_eq!(reviewer_count, (BATCH_LINK_ROWS_MAX + 5) as i64); + } } diff --git a/tests/perf_benchmark.rs b/tests/perf_benchmark.rs index f6e7ead..1f3683c 100644 --- a/tests/perf_benchmark.rs +++ b/tests/perf_benchmark.rs @@ -54,6 +54,11 @@ fn setup_db() -> Connection { label_id INTEGER NOT NULL REFERENCES labels(id), PRIMARY KEY(issue_id, label_id) ); + CREATE TABLE issue_assignees ( + issue_id INTEGER NOT NULL REFERENCES issues(id), + username TEXT NOT NULL, + PRIMARY KEY(issue_id, username) + ); CREATE TABLE documents ( id INTEGER PRIMARY KEY, @@ -145,6 +150,55 @@ fn insert_labels_batch(conn: &Connection, doc_id: i64, labels: &[&str]) { } } +/// Simulate OLD ingestion approach: individual INSERT per assignee. +fn insert_issue_assignees_individual(conn: &Connection, issue_id: i64, usernames: &[&str]) { + conn.execute( + "DELETE FROM issue_assignees WHERE issue_id = ?1", + [issue_id], + ) + .unwrap(); + for username in usernames { + conn.execute( + "INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES (?1, ?2)", + rusqlite::params![issue_id, username], + ) + .unwrap(); + } +} + +/// Simulate NEW ingestion approach: chunked batch INSERTs. +fn insert_issue_assignees_batch(conn: &Connection, issue_id: i64, usernames: &[&str]) { + conn.execute( + "DELETE FROM issue_assignees WHERE issue_id = ?1", + [issue_id], + ) + .unwrap(); + + if usernames.is_empty() { + return; + } + + const BATCH_ROWS_MAX: usize = 400; + for chunk in usernames.chunks(BATCH_ROWS_MAX) { + let placeholders = (0..chunk.len()) + .map(|idx| format!("(?1, ?{})", idx + 2)) + .collect::>() + .join(", "); + let sql = format!( + "INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES {}", + placeholders + ); + + let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1); + params.push(&issue_id); + for username in chunk { + params.push(username); + } + + conn.execute(&sql, params.as_slice()).unwrap(); + } +} + /// Simulate OLD string building: format! + push_str fn build_content_old( iid: i64, @@ -197,6 +251,22 @@ const LABEL_SETS: &[&[&str]] = &[ ], ]; +const ASSIGNEE_SETS: &[&[&str]] = &[ + &["alice", "bob", "carol", "dave", "eve", "alice", "bob"], + &[ + "frontend1", + "frontend2", + "frontend3", + "frontend4", + "frontend5", + ], + &["ops1", "ops2", "ops3", "ops1"], + &["writer1", "writer2"], + &[ + "oncall1", "oncall2", "oncall3", "oncall4", "oncall5", "oncall6", + ], +]; + #[test] fn bench_label_insert_individual_vs_batch() { let conn = setup_db(); @@ -273,6 +343,77 @@ fn bench_label_insert_individual_vs_batch() { ); } +#[test] +fn bench_issue_assignee_insert_individual_vs_batch() { + let conn = setup_db(); + + conn.execute( + "INSERT INTO issues (id, gitlab_id, project_id, iid, title, state, created_at, updated_at, last_seen_at) + VALUES (2, 20, 1, 43, 'Assignee Issue', 'opened', 1000, 2000, 3000)", + [], + ) + .unwrap(); + + let iterations = 20_000; + + // Warm up + for users in ASSIGNEE_SETS { + insert_issue_assignees_individual(&conn, 2, users); + insert_issue_assignees_batch(&conn, 2, users); + } + + // Benchmark OLD + let start = Instant::now(); + for i in 0..iterations { + let users = ASSIGNEE_SETS[i % ASSIGNEE_SETS.len()]; + insert_issue_assignees_individual(&conn, 2, users); + } + let old_elapsed = start.elapsed(); + + // Benchmark NEW + let start = Instant::now(); + for i in 0..iterations { + let users = ASSIGNEE_SETS[i % ASSIGNEE_SETS.len()]; + insert_issue_assignees_batch(&conn, 2, users); + } + let new_elapsed = start.elapsed(); + + let speedup = old_elapsed.as_nanos() as f64 / new_elapsed.as_nanos() as f64; + + println!( + "\n=== Issue Assignee INSERT Benchmark ({} iterations) ===", + iterations + ); + println!("Individual INSERTs: {:?}", old_elapsed); + println!("Batch INSERT: {:?}", new_elapsed); + println!("Speedup: {:.2}x", speedup); + println!(); + + // Verify correctness: both approaches produce identical rows. + insert_issue_assignees_individual(&conn, 2, &["alice", "bob", "alice", "carol"]); + let old_rows: Vec = conn + .prepare("SELECT username FROM issue_assignees WHERE issue_id = 2 ORDER BY username") + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .collect::, _>>() + .unwrap(); + + insert_issue_assignees_batch(&conn, 2, &["alice", "bob", "alice", "carol"]); + let new_rows: Vec = conn + .prepare("SELECT username FROM issue_assignees WHERE issue_id = 2 ORDER BY username") + .unwrap() + .query_map([], |row| row.get(0)) + .unwrap() + .collect::, _>>() + .unwrap(); + + assert_eq!( + old_rows, new_rows, + "Both approaches must produce identical rows" + ); +} + #[test] fn bench_string_building_old_vs_new() { let iterations = 50_000;