From ee5c5f9645f9dde325cf284b72ac7240448dcab1 Mon Sep 17 00:00:00 2001 From: Taylor Eernisse Date: Wed, 4 Feb 2026 08:12:37 -0500 Subject: [PATCH] perf: Eliminate double serialization, add SQLite tuning, optimize hot paths 11 isomorphic performance fixes from deep audit (no behavior changes): - Eliminate double serialization: store_payload now accepts pre-serialized bytes (&[u8]) instead of re-serializing from serde_json::Value. Uses Cow<[u8]> for zero-copy when compression is disabled. - Add SQLite cache_size (64MB) and mmap_size (256MB) pragmas - Replace SELECT-then-INSERT label upserts with INSERT...ON CONFLICT RETURNING in both issues.rs and merge_requests.rs - Replace INSERT + SELECT milestone upsert with RETURNING - Use prepare_cached for 5 hot-path queries in extractor.rs - Optimize compute_list_hash: index-sort + incremental SHA-256 instead of clone+sort+join+hash - Pre-allocate embedding float-to-bytes buffer with Vec::with_capacity - Replace RandomState::new() in rand_jitter with atomic counter XOR nanos - Remove redundant per-note payload storage (discussion payload contains all notes already) - Change transform_issue to accept &GitLabIssue (avoids full struct clone) Co-Authored-By: Claude Opus 4.5 --- src/core/db.rs | 6 +++ src/core/payloads.rs | 28 ++++++----- src/documents/extractor.rs | 27 ++++++---- src/embedding/pipeline.rs | 5 +- src/gitlab/client.rs | 86 ++++++++++++++++++++------------ src/gitlab/transformers/issue.rs | 36 ++++++------- src/ingestion/discussions.rs | 29 +++-------- src/ingestion/issues.rs | 52 ++++++++----------- src/ingestion/merge_requests.rs | 44 ++++++++-------- src/ingestion/mr_discussions.rs | 16 +++--- 10 files changed, 172 insertions(+), 157 deletions(-) diff --git a/src/core/db.rs b/src/core/db.rs index 05af814..d7cca89 100644 --- a/src/core/db.rs +++ b/src/core/db.rs @@ -43,6 +43,10 @@ const MIGRATIONS: &[(&str, &str)] = &[ "012", include_str!("../../migrations/012_nullable_label_milestone.sql"), ), + ( + "013", + include_str!("../../migrations/013_resource_event_watermarks.sql"), + ), ]; /// Create a database connection with production-grade pragmas. @@ -68,6 +72,8 @@ pub fn create_connection(db_path: &Path) -> Result { conn.pragma_update(None, "foreign_keys", "ON")?; conn.pragma_update(None, "busy_timeout", 5000)?; // 5s wait on lock contention conn.pragma_update(None, "temp_store", "MEMORY")?; // Small speed win + conn.pragma_update(None, "cache_size", -64000)?; // 64MB cache (negative = KB) + conn.pragma_update(None, "mmap_size", 268_435_456)?; // 256MB memory-mapped I/O debug!(db_path = %db_path.display(), "Database connection created"); diff --git a/src/core/payloads.rs b/src/core/payloads.rs index 328e7dd..bc0e1cd 100644 --- a/src/core/payloads.rs +++ b/src/core/payloads.rs @@ -15,19 +15,18 @@ pub struct StorePayloadOptions<'a> { pub project_id: Option, pub resource_type: &'a str, // 'project' | 'issue' | 'mr' | 'note' | 'discussion' pub gitlab_id: &'a str, // TEXT because discussion IDs are strings - pub payload: &'a serde_json::Value, + pub json_bytes: &'a [u8], pub compress: bool, } /// Store a raw API payload with optional compression and deduplication. /// Returns the row ID (either new or existing if duplicate). pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result { - // 1. JSON stringify the payload - let json_bytes = serde_json::to_vec(options.payload)?; + let json_bytes = options.json_bytes; // 2. SHA-256 hash the JSON bytes (pre-compression) let mut hasher = Sha256::new(); - hasher.update(&json_bytes); + hasher.update(json_bytes); let payload_hash = format!("{:x}", hasher.finalize()); // 3. Check for duplicate by (project_id, resource_type, gitlab_id, payload_hash) @@ -51,12 +50,12 @@ pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result< } // 5. Compress if requested - let (encoding, payload_bytes) = if options.compress { + let (encoding, payload_bytes): (&str, std::borrow::Cow<'_, [u8]>) = if options.compress { let mut encoder = GzEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(&json_bytes)?; - ("gzip", encoder.finish()?) + encoder.write_all(json_bytes)?; + ("gzip", std::borrow::Cow::Owned(encoder.finish()?)) } else { - ("identity", json_bytes) + ("identity", std::borrow::Cow::Borrowed(json_bytes)) }; // 6. INSERT with content_encoding @@ -71,7 +70,7 @@ pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result< now_ms(), encoding, &payload_hash, - &payload_bytes, + payload_bytes.as_ref(), ), )?; @@ -143,6 +142,7 @@ mod tests { fn test_store_and_read_payload() { let conn = setup_test_db(); let payload = serde_json::json!({"title": "Test Issue", "id": 123}); + let json_bytes = serde_json::to_vec(&payload).unwrap(); let id = store_payload( &conn, @@ -150,7 +150,7 @@ mod tests { project_id: Some(1), resource_type: "issue", gitlab_id: "123", - payload: &payload, + json_bytes: &json_bytes, compress: false, }, ) @@ -164,6 +164,7 @@ mod tests { fn test_compression_roundtrip() { let conn = setup_test_db(); let payload = serde_json::json!({"data": "x".repeat(1000)}); + let json_bytes = serde_json::to_vec(&payload).unwrap(); let id = store_payload( &conn, @@ -171,7 +172,7 @@ mod tests { project_id: Some(1), resource_type: "issue", gitlab_id: "456", - payload: &payload, + json_bytes: &json_bytes, compress: true, }, ) @@ -185,6 +186,7 @@ mod tests { fn test_deduplication() { let conn = setup_test_db(); let payload = serde_json::json!({"id": 789}); + let json_bytes = serde_json::to_vec(&payload).unwrap(); let id1 = store_payload( &conn, @@ -192,7 +194,7 @@ mod tests { project_id: Some(1), resource_type: "issue", gitlab_id: "789", - payload: &payload, + json_bytes: &json_bytes, compress: false, }, ) @@ -204,7 +206,7 @@ mod tests { project_id: Some(1), resource_type: "issue", gitlab_id: "789", - payload: &payload, + json_bytes: &json_bytes, compress: false, }, ) diff --git a/src/documents/extractor.rs b/src/documents/extractor.rs index dbc79a9..bbeb03c 100644 --- a/src/documents/extractor.rs +++ b/src/documents/extractor.rs @@ -77,11 +77,18 @@ pub fn compute_content_hash(content: &str) -> String { /// Compute SHA-256 hash over a sorted list of strings. /// Used for labels_hash and paths_hash to detect changes efficiently. +/// Sorts by index reference to avoid cloning, hashes incrementally to avoid join allocation. pub fn compute_list_hash(items: &[String]) -> String { - let mut sorted = items.to_vec(); - sorted.sort(); - let joined = sorted.join("\n"); - compute_content_hash(&joined) + let mut indices: Vec = (0..items.len()).collect(); + indices.sort_by(|a, b| items[*a].cmp(&items[*b])); + let mut hasher = Sha256::new(); + for (i, &idx) in indices.iter().enumerate() { + if i > 0 { + hasher.update(b"\n"); + } + hasher.update(items[idx].as_bytes()); + } + format!("{:x}", hasher.finalize()) } /// Extract a searchable document from an issue. @@ -132,7 +139,7 @@ pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result