perf(ingestion): replace per-row INSERT loops with chunked batch INSERTs

The issue and MR ingestion paths previously inserted labels, assignees,
and reviewers one row at a time inside a transaction. For entities with
many labels or assignees, this issued N separate SQLite statements where
a single multi-row INSERT suffices.

Replace the per-row loops with batch INSERT functions that build a
single `INSERT OR IGNORE ... VALUES (?1,?2),(?1,?3),...` statement per
chunk. Chunks are capped at 400 rows (BATCH_LINK_ROWS_MAX) to stay
comfortably below SQLite's default 999 bind-parameter limit.

Affected paths:
- issues.rs: link_issue_labels_batch_tx, insert_issue_assignees_batch_tx
- merge_requests.rs: insert_mr_labels_batch_tx,
  insert_mr_assignees_batch_tx, insert_mr_reviewers_batch_tx

New tests verify deduplication (OR IGNORE), multi-chunk correctness,
and equivalence with the old per-row approach. A perf benchmark
(bench_issue_assignee_insert_individual_vs_batch) demonstrates the
speedup across representative assignee set sizes.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
teernisse
2026-03-06 11:28:08 -05:00
parent 5fb27b1fbb
commit 9107a78b57
4 changed files with 534 additions and 28 deletions

View File

@@ -36,6 +36,9 @@ struct SyncCursor {
tie_breaker_id: Option<i64>,
}
// Keep comfortably below SQLite's default 999 bind-parameter limit.
const BATCH_LINK_ROWS_MAX: usize = 400;
pub async fn ingest_issues(
conn: &Connection,
client: &GitLabClient,
@@ -252,22 +255,19 @@ fn process_issue_in_transaction(
[local_issue_id],
)?;
let mut label_ids = Vec::with_capacity(label_names.len());
for label_name in label_names {
let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?;
link_issue_label_tx(tx, local_issue_id, label_id)?;
label_ids.push(label_id);
}
link_issue_labels_batch_tx(tx, local_issue_id, &label_ids)?;
tx.execute(
"DELETE FROM issue_assignees WHERE issue_id = ?",
[local_issue_id],
)?;
for username in assignee_usernames {
tx.execute(
"INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES (?, ?)",
(local_issue_id, username),
)?;
}
insert_issue_assignees_batch_tx(tx, local_issue_id, assignee_usernames)?;
Ok(labels_created)
}
@@ -296,11 +296,65 @@ fn upsert_label_tx(
Ok(id)
}
fn link_issue_label_tx(tx: &Transaction<'_>, issue_id: i64, label_id: i64) -> Result<()> {
tx.execute(
"INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES (?, ?)",
(issue_id, label_id),
)?;
fn link_issue_labels_batch_tx(
tx: &Transaction<'_>,
issue_id: i64,
label_ids: &[i64],
) -> Result<()> {
if label_ids.is_empty() {
return Ok(());
}
for chunk in label_ids.chunks(BATCH_LINK_ROWS_MAX) {
let placeholders = (0..chunk.len())
.map(|idx| format!("(?1, ?{})", idx + 2))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES {}",
placeholders
);
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1);
params.push(&issue_id);
for label_id in chunk {
params.push(label_id);
}
tx.execute(&sql, params.as_slice())?;
}
Ok(())
}
fn insert_issue_assignees_batch_tx(
tx: &Transaction<'_>,
issue_id: i64,
usernames: &[String],
) -> Result<()> {
if usernames.is_empty() {
return Ok(());
}
for chunk in usernames.chunks(BATCH_LINK_ROWS_MAX) {
let placeholders = (0..chunk.len())
.map(|idx| format!("(?1, ?{})", idx + 2))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES {}",
placeholders
);
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1);
params.push(&issue_id);
for username in chunk {
params.push(username);
}
tx.execute(&sql, params.as_slice())?;
}
Ok(())
}

View File

@@ -1,5 +1,6 @@
use super::*;
use crate::gitlab::types::GitLabAuthor;
use rusqlite::Connection;
fn passes_cursor_filter(issue: &GitLabIssue, cursor: &SyncCursor) -> Result<bool> {
let Some(cursor_ts) = cursor.updated_at_cursor else {
@@ -47,6 +48,26 @@ fn make_test_issue(id: i64, updated_at: &str) -> GitLabIssue {
}
}
fn setup_link_tables() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
"
CREATE TABLE issue_labels (
issue_id INTEGER NOT NULL,
label_id INTEGER NOT NULL,
PRIMARY KEY(issue_id, label_id)
);
CREATE TABLE issue_assignees (
issue_id INTEGER NOT NULL,
username TEXT NOT NULL,
PRIMARY KEY(issue_id, username)
);
",
)
.unwrap();
conn
}
#[test]
fn cursor_filter_allows_newer_issues() {
let cursor = SyncCursor {
@@ -93,3 +114,83 @@ fn cursor_filter_allows_all_when_no_cursor() {
let issue = make_test_issue(1, "2020-01-01T00:00:00.000Z");
assert!(passes_cursor_filter(&issue, &cursor).unwrap_or(false));
}
#[test]
fn batch_issue_label_insert_deduplicates_ids() {
let conn = setup_link_tables();
let tx = conn.unchecked_transaction().unwrap();
link_issue_labels_batch_tx(&tx, 42, &[9, 3, 9, 1]).unwrap();
tx.commit().unwrap();
let ids: Vec<i64> = conn
.prepare("SELECT label_id FROM issue_labels WHERE issue_id = 42 ORDER BY label_id")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(ids, vec![1, 3, 9]);
}
#[test]
fn batch_issue_assignee_insert_deduplicates_usernames() {
let conn = setup_link_tables();
let tx = conn.unchecked_transaction().unwrap();
let users = vec![
"alice".to_string(),
"bob".to_string(),
"alice".to_string(),
"carol".to_string(),
];
insert_issue_assignees_batch_tx(&tx, 7, &users).unwrap();
tx.commit().unwrap();
let names: Vec<String> = conn
.prepare("SELECT username FROM issue_assignees WHERE issue_id = 7 ORDER BY username")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(names, vec!["alice", "bob", "carol"]);
}
#[test]
fn batch_issue_links_handle_multiple_chunks() {
let conn = setup_link_tables();
let tx = conn.unchecked_transaction().unwrap();
let mut label_ids: Vec<i64> = (0..(BATCH_LINK_ROWS_MAX as i64 + 3))
.map(|idx| idx + 10)
.collect();
label_ids.push(10);
let mut users: Vec<String> = (0..(BATCH_LINK_ROWS_MAX + 5))
.map(|idx| format!("user-{idx}"))
.collect();
users.push("user-0".to_string());
link_issue_labels_batch_tx(&tx, 11, &label_ids).unwrap();
insert_issue_assignees_batch_tx(&tx, 11, &users).unwrap();
tx.commit().unwrap();
let label_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issue_labels WHERE issue_id = 11",
[],
|row| row.get(0),
)
.unwrap();
let user_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM issue_assignees WHERE issue_id = 11",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(label_count, BATCH_LINK_ROWS_MAX as i64 + 3);
assert_eq!(user_count, (BATCH_LINK_ROWS_MAX + 5) as i64);
}

View File

@@ -36,6 +36,9 @@ struct SyncCursor {
tie_breaker_id: Option<i64>,
}
// Keep comfortably below SQLite's default 999 bind-parameter limit.
const BATCH_LINK_ROWS_MAX: usize = 400;
pub async fn ingest_merge_requests(
conn: &Connection,
client: &GitLabClient,
@@ -252,37 +255,26 @@ fn process_mr_in_transaction(
"DELETE FROM mr_labels WHERE merge_request_id = ?",
[local_mr_id],
)?;
let mut label_ids = Vec::with_capacity(transformed.label_names.len());
for label_name in &transformed.label_names {
let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?;
tx.execute(
"INSERT OR IGNORE INTO mr_labels (merge_request_id, label_id) VALUES (?, ?)",
(local_mr_id, label_id),
)?;
label_ids.push(label_id);
}
insert_mr_labels_batch_tx(tx, local_mr_id, &label_ids)?;
tx.execute(
"DELETE FROM mr_assignees WHERE merge_request_id = ?",
[local_mr_id],
)?;
let assignees_linked = transformed.assignee_usernames.len();
for username in &transformed.assignee_usernames {
tx.execute(
"INSERT OR IGNORE INTO mr_assignees (merge_request_id, username) VALUES (?, ?)",
(local_mr_id, username),
)?;
}
insert_mr_assignees_batch_tx(tx, local_mr_id, &transformed.assignee_usernames)?;
tx.execute(
"DELETE FROM mr_reviewers WHERE merge_request_id = ?",
[local_mr_id],
)?;
let reviewers_linked = transformed.reviewer_usernames.len();
for username in &transformed.reviewer_usernames {
tx.execute(
"INSERT OR IGNORE INTO mr_reviewers (merge_request_id, username) VALUES (?, ?)",
(local_mr_id, username),
)?;
}
insert_mr_reviewers_batch_tx(tx, local_mr_id, &transformed.reviewer_usernames)?;
Ok(ProcessMrResult {
labels_created,
@@ -315,6 +307,99 @@ fn upsert_label_tx(
Ok(id)
}
fn insert_mr_labels_batch_tx(
tx: &Transaction<'_>,
merge_request_id: i64,
label_ids: &[i64],
) -> Result<()> {
if label_ids.is_empty() {
return Ok(());
}
for chunk in label_ids.chunks(BATCH_LINK_ROWS_MAX) {
let placeholders = (0..chunk.len())
.map(|idx| format!("(?1, ?{})", idx + 2))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT OR IGNORE INTO mr_labels (merge_request_id, label_id) VALUES {}",
placeholders
);
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1);
params.push(&merge_request_id);
for label_id in chunk {
params.push(label_id);
}
tx.execute(&sql, params.as_slice())?;
}
Ok(())
}
fn insert_mr_assignees_batch_tx(
tx: &Transaction<'_>,
merge_request_id: i64,
usernames: &[String],
) -> Result<()> {
if usernames.is_empty() {
return Ok(());
}
for chunk in usernames.chunks(BATCH_LINK_ROWS_MAX) {
let placeholders = (0..chunk.len())
.map(|idx| format!("(?1, ?{})", idx + 2))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT OR IGNORE INTO mr_assignees (merge_request_id, username) VALUES {}",
placeholders
);
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1);
params.push(&merge_request_id);
for username in chunk {
params.push(username);
}
tx.execute(&sql, params.as_slice())?;
}
Ok(())
}
fn insert_mr_reviewers_batch_tx(
tx: &Transaction<'_>,
merge_request_id: i64,
usernames: &[String],
) -> Result<()> {
if usernames.is_empty() {
return Ok(());
}
for chunk in usernames.chunks(BATCH_LINK_ROWS_MAX) {
let placeholders = (0..chunk.len())
.map(|idx| format!("(?1, ?{})", idx + 2))
.collect::<Vec<_>>()
.join(", ");
let sql = format!(
"INSERT OR IGNORE INTO mr_reviewers (merge_request_id, username) VALUES {}",
placeholders
);
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1);
params.push(&merge_request_id);
for username in chunk {
params.push(username);
}
tx.execute(&sql, params.as_slice())?;
}
Ok(())
}
fn passes_cursor_filter_with_ts(gitlab_id: i64, mr_ts: i64, cursor: &SyncCursor) -> bool {
let Some(cursor_ts) = cursor.updated_at_cursor else {
return true;
@@ -425,6 +510,32 @@ fn parse_timestamp(ts: &str) -> Result<i64> {
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
fn setup_link_tables() -> Connection {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
"
CREATE TABLE mr_labels (
merge_request_id INTEGER NOT NULL,
label_id INTEGER NOT NULL,
PRIMARY KEY(merge_request_id, label_id)
);
CREATE TABLE mr_assignees (
merge_request_id INTEGER NOT NULL,
username TEXT NOT NULL,
PRIMARY KEY(merge_request_id, username)
);
CREATE TABLE mr_reviewers (
merge_request_id INTEGER NOT NULL,
username TEXT NOT NULL,
PRIMARY KEY(merge_request_id, username)
);
",
)
.unwrap();
conn
}
#[test]
fn result_default_has_zero_counts() {
@@ -478,4 +589,103 @@ mod tests {
let old_ts = 1577836800000;
assert!(passes_cursor_filter_with_ts(1, old_ts, &cursor));
}
#[test]
fn batch_mr_label_insert_deduplicates_ids() {
let conn = setup_link_tables();
let tx = conn.unchecked_transaction().unwrap();
insert_mr_labels_batch_tx(&tx, 99, &[5, 2, 5, 1]).unwrap();
tx.commit().unwrap();
let ids: Vec<i64> = conn
.prepare("SELECT label_id FROM mr_labels WHERE merge_request_id = 99 ORDER BY label_id")
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(ids, vec![1, 2, 5]);
}
#[test]
fn batch_mr_assignee_and_reviewer_insert_deduplicates_usernames() {
let conn = setup_link_tables();
let tx = conn.unchecked_transaction().unwrap();
let users = vec!["alice".to_string(), "bob".to_string(), "alice".to_string()];
insert_mr_assignees_batch_tx(&tx, 33, &users).unwrap();
insert_mr_reviewers_batch_tx(&tx, 33, &users).unwrap();
tx.commit().unwrap();
let assignees: Vec<String> = conn
.prepare(
"SELECT username FROM mr_assignees WHERE merge_request_id = 33 ORDER BY username",
)
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
let reviewers: Vec<String> = conn
.prepare(
"SELECT username FROM mr_reviewers WHERE merge_request_id = 33 ORDER BY username",
)
.unwrap()
.query_map([], |row| row.get(0))
.unwrap()
.collect::<std::result::Result<Vec<_>, _>>()
.unwrap();
assert_eq!(assignees, vec!["alice", "bob"]);
assert_eq!(reviewers, vec!["alice", "bob"]);
}
#[test]
fn batch_mr_links_handle_multiple_chunks() {
let conn = setup_link_tables();
let tx = conn.unchecked_transaction().unwrap();
let mut label_ids: Vec<i64> = (0..(BATCH_LINK_ROWS_MAX as i64 + 3))
.map(|idx| idx + 100)
.collect();
label_ids.push(100);
let mut users: Vec<String> = (0..(BATCH_LINK_ROWS_MAX + 5))
.map(|idx| format!("user-{idx}"))
.collect();
users.push("user-0".to_string());
insert_mr_labels_batch_tx(&tx, 77, &label_ids).unwrap();
insert_mr_assignees_batch_tx(&tx, 77, &users).unwrap();
insert_mr_reviewers_batch_tx(&tx, 77, &users).unwrap();
tx.commit().unwrap();
let label_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM mr_labels WHERE merge_request_id = 77",
[],
|row| row.get(0),
)
.unwrap();
let assignee_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM mr_assignees WHERE merge_request_id = 77",
[],
|row| row.get(0),
)
.unwrap();
let reviewer_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM mr_reviewers WHERE merge_request_id = 77",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(label_count, BATCH_LINK_ROWS_MAX as i64 + 3);
assert_eq!(assignee_count, (BATCH_LINK_ROWS_MAX + 5) as i64);
assert_eq!(reviewer_count, (BATCH_LINK_ROWS_MAX + 5) as i64);
}
}