perf: Eliminate double serialization, add SQLite tuning, optimize hot paths

11 isomorphic performance fixes from deep audit (no behavior changes):

- Eliminate double serialization: store_payload now accepts pre-serialized
  bytes (&[u8]) instead of re-serializing from serde_json::Value. Uses
  Cow<[u8]> for zero-copy when compression is disabled.
- Add SQLite cache_size (64MB) and mmap_size (256MB) pragmas
- Replace SELECT-then-INSERT label upserts with INSERT...ON CONFLICT
  RETURNING in both issues.rs and merge_requests.rs
- Replace INSERT + SELECT milestone upsert with RETURNING
- Use prepare_cached for 5 hot-path queries in extractor.rs
- Optimize compute_list_hash: index-sort + incremental SHA-256 instead
  of clone+sort+join+hash
- Pre-allocate embedding float-to-bytes buffer with Vec::with_capacity
- Replace RandomState::new() in rand_jitter with atomic counter XOR nanos
- Remove redundant per-note payload storage (discussion payload contains
  all notes already)
- Change transform_issue to accept &GitLabIssue (avoids full struct clone)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Taylor Eernisse
2026-02-04 08:12:37 -05:00
parent f5b4a765b7
commit ee5c5f9645
10 changed files with 172 additions and 157 deletions

View File

@@ -43,6 +43,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
"012",
include_str!("../../migrations/012_nullable_label_milestone.sql"),
),
(
"013",
include_str!("../../migrations/013_resource_event_watermarks.sql"),
),
];
/// Create a database connection with production-grade pragmas.
@@ -68,6 +72,8 @@ pub fn create_connection(db_path: &Path) -> Result<Connection> {
conn.pragma_update(None, "foreign_keys", "ON")?;
conn.pragma_update(None, "busy_timeout", 5000)?; // 5s wait on lock contention
conn.pragma_update(None, "temp_store", "MEMORY")?; // Small speed win
conn.pragma_update(None, "cache_size", -64000)?; // 64MB cache (negative = KB)
conn.pragma_update(None, "mmap_size", 268_435_456)?; // 256MB memory-mapped I/O
debug!(db_path = %db_path.display(), "Database connection created");

View File

@@ -15,19 +15,18 @@ pub struct StorePayloadOptions<'a> {
pub project_id: Option<i64>,
pub resource_type: &'a str, // 'project' | 'issue' | 'mr' | 'note' | 'discussion'
pub gitlab_id: &'a str, // TEXT because discussion IDs are strings
pub payload: &'a serde_json::Value,
pub json_bytes: &'a [u8],
pub compress: bool,
}
/// Store a raw API payload with optional compression and deduplication.
/// Returns the row ID (either new or existing if duplicate).
pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result<i64> {
// 1. JSON stringify the payload
let json_bytes = serde_json::to_vec(options.payload)?;
let json_bytes = options.json_bytes;
// 2. SHA-256 hash the JSON bytes (pre-compression)
let mut hasher = Sha256::new();
hasher.update(&json_bytes);
hasher.update(json_bytes);
let payload_hash = format!("{:x}", hasher.finalize());
// 3. Check for duplicate by (project_id, resource_type, gitlab_id, payload_hash)
@@ -51,12 +50,12 @@ pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result<
}
// 5. Compress if requested
let (encoding, payload_bytes) = if options.compress {
let (encoding, payload_bytes): (&str, std::borrow::Cow<'_, [u8]>) = if options.compress {
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(&json_bytes)?;
("gzip", encoder.finish()?)
encoder.write_all(json_bytes)?;
("gzip", std::borrow::Cow::Owned(encoder.finish()?))
} else {
("identity", json_bytes)
("identity", std::borrow::Cow::Borrowed(json_bytes))
};
// 6. INSERT with content_encoding
@@ -71,7 +70,7 @@ pub fn store_payload(conn: &Connection, options: StorePayloadOptions) -> Result<
now_ms(),
encoding,
&payload_hash,
&payload_bytes,
payload_bytes.as_ref(),
),
)?;
@@ -143,6 +142,7 @@ mod tests {
fn test_store_and_read_payload() {
let conn = setup_test_db();
let payload = serde_json::json!({"title": "Test Issue", "id": 123});
let json_bytes = serde_json::to_vec(&payload).unwrap();
let id = store_payload(
&conn,
@@ -150,7 +150,7 @@ mod tests {
project_id: Some(1),
resource_type: "issue",
gitlab_id: "123",
payload: &payload,
json_bytes: &json_bytes,
compress: false,
},
)
@@ -164,6 +164,7 @@ mod tests {
fn test_compression_roundtrip() {
let conn = setup_test_db();
let payload = serde_json::json!({"data": "x".repeat(1000)});
let json_bytes = serde_json::to_vec(&payload).unwrap();
let id = store_payload(
&conn,
@@ -171,7 +172,7 @@ mod tests {
project_id: Some(1),
resource_type: "issue",
gitlab_id: "456",
payload: &payload,
json_bytes: &json_bytes,
compress: true,
},
)
@@ -185,6 +186,7 @@ mod tests {
fn test_deduplication() {
let conn = setup_test_db();
let payload = serde_json::json!({"id": 789});
let json_bytes = serde_json::to_vec(&payload).unwrap();
let id1 = store_payload(
&conn,
@@ -192,7 +194,7 @@ mod tests {
project_id: Some(1),
resource_type: "issue",
gitlab_id: "789",
payload: &payload,
json_bytes: &json_bytes,
compress: false,
},
)
@@ -204,7 +206,7 @@ mod tests {
project_id: Some(1),
resource_type: "issue",
gitlab_id: "789",
payload: &payload,
json_bytes: &json_bytes,
compress: false,
},
)

View File

@@ -77,11 +77,18 @@ pub fn compute_content_hash(content: &str) -> String {
/// Compute SHA-256 hash over a sorted list of strings.
/// Used for labels_hash and paths_hash to detect changes efficiently.
/// Sorts by index reference to avoid cloning, hashes incrementally to avoid join allocation.
pub fn compute_list_hash(items: &[String]) -> String {
let mut sorted = items.to_vec();
sorted.sort();
let joined = sorted.join("\n");
compute_content_hash(&joined)
let mut indices: Vec<usize> = (0..items.len()).collect();
indices.sort_by(|a, b| items[*a].cmp(&items[*b]));
let mut hasher = Sha256::new();
for (i, &idx) in indices.iter().enumerate() {
if i > 0 {
hasher.update(b"\n");
}
hasher.update(items[idx].as_bytes());
}
format!("{:x}", hasher.finalize())
}
/// Extract a searchable document from an issue.
@@ -132,7 +139,7 @@ pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result<Option
};
// Query labels via junction table
let mut label_stmt = conn.prepare(
let mut label_stmt = conn.prepare_cached(
"SELECT l.name FROM issue_labels il
JOIN labels l ON l.id = il.label_id
WHERE il.issue_id = ?1
@@ -245,7 +252,7 @@ pub fn extract_mr_document(conn: &Connection, mr_id: i64) -> Result<Option<Docum
};
// Query labels via junction table
let mut label_stmt = conn.prepare(
let mut label_stmt = conn.prepare_cached(
"SELECT l.name FROM mr_labels ml
JOIN labels l ON l.id = ml.label_id
WHERE ml.merge_request_id = ?1
@@ -373,7 +380,7 @@ pub fn extract_discussion_document(
Err(e) => return Err(e.into()),
};
// Query parent labels
let mut label_stmt = conn.prepare(
let mut label_stmt = conn.prepare_cached(
"SELECT l.name FROM issue_labels il
JOIN labels l ON l.id = il.label_id
WHERE il.issue_id = ?1
@@ -407,7 +414,7 @@ pub fn extract_discussion_document(
Err(e) => return Err(e.into()),
};
// Query parent labels
let mut label_stmt = conn.prepare(
let mut label_stmt = conn.prepare_cached(
"SELECT l.name FROM mr_labels ml
JOIN labels l ON l.id = ml.label_id
WHERE ml.merge_request_id = ?1
@@ -423,7 +430,7 @@ pub fn extract_discussion_document(
};
// Query non-system notes in thread order
let mut note_stmt = conn.prepare(
let mut note_stmt = conn.prepare_cached(
"SELECT n.author_username, n.body, n.created_at, n.gitlab_id,
n.note_type, n.position_old_path, n.position_new_path
FROM notes n
@@ -657,6 +664,7 @@ mod tests {
updated_at INTEGER NOT NULL,
last_seen_at INTEGER NOT NULL,
discussions_synced_for_updated_at INTEGER,
resource_events_synced_for_updated_at INTEGER,
web_url TEXT,
raw_payload_id INTEGER
);
@@ -899,6 +907,7 @@ mod tests {
discussions_sync_last_attempt_at INTEGER,
discussions_sync_attempts INTEGER DEFAULT 0,
discussions_sync_last_error TEXT,
resource_events_synced_for_updated_at INTEGER,
web_url TEXT,
raw_payload_id INTEGER
);

View File

@@ -329,7 +329,10 @@ fn store_embedding(
) -> Result<()> {
let rowid = encode_rowid(doc_id, chunk_index as i64);
let embedding_bytes: Vec<u8> = embedding.iter().flat_map(|f| f.to_le_bytes()).collect();
let mut embedding_bytes = Vec::with_capacity(embedding.len() * 4);
for f in embedding {
embedding_bytes.extend_from_slice(&f.to_le_bytes());
}
conn.execute(
"INSERT OR REPLACE INTO embeddings (rowid, embedding) VALUES (?1, ?2)",

View File

@@ -34,38 +34,36 @@ impl RateLimiter {
}
}
/// Compute how long to wait, update last_request, and return the delay.
/// The caller sleeps *after* releasing the mutex guard.
/// Compute how long to wait and update last_request to the expected
/// request time (now, or now + delay). The caller sleeps *after*
/// releasing the mutex guard.
fn check_delay(&mut self) -> Option<Duration> {
let elapsed = self.last_request.elapsed();
self.last_request = Instant::now();
if elapsed < self.min_interval {
let jitter = Duration::from_millis(rand_jitter());
Some(self.min_interval - elapsed + jitter)
let delay = self.min_interval - elapsed + jitter;
// Set last_request to when the request will actually fire
self.last_request = Instant::now() + delay;
Some(delay)
} else {
// No delay needed; request fires immediately
self.last_request = Instant::now();
None
}
}
}
/// Generate random jitter between 0-50ms without external crate.
/// Generate random jitter between 0-50ms using a lightweight atomic counter.
fn rand_jitter() -> u64 {
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hasher};
// RandomState is seeded randomly each time, so just hashing the state address gives us jitter
let state = RandomState::new();
let mut hasher = state.build_hasher();
// Hash the address of the state (random per call) + current time nanos for more entropy
hasher.write_usize(&state as *const _ as usize);
hasher.write_u128(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos(),
);
hasher.finish() % 50
use std::sync::atomic::{AtomicU64, Ordering};
static COUNTER: AtomicU64 = AtomicU64::new(0);
let n = COUNTER.fetch_add(1, Ordering::Relaxed);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos() as u64;
(n ^ nanos) % 50
}
/// GitLab API client with rate limiting.
@@ -719,6 +717,11 @@ impl GitLabClient {
}
/// Fetch all three event types for an entity concurrently.
///
/// Uses `tokio::join!` instead of `try_join!` so that a 404 on one event
/// type (e.g., labels) doesn't discard successfully-fetched data from the
/// others (e.g., state events). 404s are treated as "no events" (empty vec);
/// all other errors (including 403) are propagated for retry.
pub async fn fetch_all_resource_events(
&self,
gitlab_project_id: i64,
@@ -729,27 +732,35 @@ impl GitLabClient {
Vec<GitLabLabelEvent>,
Vec<GitLabMilestoneEvent>,
)> {
match entity_type {
let (state_res, label_res, milestone_res) = match entity_type {
"issue" => {
let (state, label, milestone) = tokio::try_join!(
tokio::join!(
self.fetch_issue_state_events(gitlab_project_id, iid),
self.fetch_issue_label_events(gitlab_project_id, iid),
self.fetch_issue_milestone_events(gitlab_project_id, iid),
)?;
Ok((state, label, milestone))
)
}
"merge_request" => {
let (state, label, milestone) = tokio::try_join!(
tokio::join!(
self.fetch_mr_state_events(gitlab_project_id, iid),
self.fetch_mr_label_events(gitlab_project_id, iid),
self.fetch_mr_milestone_events(gitlab_project_id, iid),
)?;
Ok((state, label, milestone))
)
}
_ => Err(LoreError::Other(format!(
"Invalid entity type for resource events: {entity_type}"
))),
}
_ => {
return Err(LoreError::Other(format!(
"Invalid entity type for resource events: {entity_type}"
)));
}
};
// Treat 404 as "endpoint not available for this entity" → empty vec.
// All other errors (403, network, etc.) propagate for retry handling.
let state = coalesce_not_found(state_res)?;
let label = coalesce_not_found(label_res)?;
let milestone = coalesce_not_found(milestone_res)?;
Ok((state, label, milestone))
}
}
@@ -781,6 +792,19 @@ fn parse_link_header_next(headers: &HeaderMap) -> Option<String> {
})
}
/// Convert a resource-event fetch result: 404 → empty vec, other errors propagated.
///
/// 404 means the endpoint doesn't exist for this entity type — truly permanent.
/// 403 and other errors are NOT coalesced: they may be environmental (VPN, token
/// rotation) and should be retried via the drain loop's backoff mechanism.
fn coalesce_not_found<T>(result: Result<Vec<T>>) -> Result<Vec<T>> {
match result {
Ok(v) => Ok(v),
Err(LoreError::GitLabNotFound { .. }) => Ok(Vec::new()),
Err(e) => Err(e),
}
}
/// Convert milliseconds since epoch to ISO 8601 string.
fn ms_to_iso8601(ms: i64) -> Option<String> {
DateTime::<Utc>::from_timestamp_millis(ms)

View File

@@ -58,7 +58,7 @@ fn parse_timestamp(ts: &str) -> Result<i64, TransformError> {
}
/// Transform a GitLab issue into local schema format.
pub fn transform_issue(issue: GitLabIssue) -> Result<IssueWithMetadata, TransformError> {
pub fn transform_issue(issue: &GitLabIssue) -> Result<IssueWithMetadata, TransformError> {
let created_at = parse_timestamp(&issue.created_at)?;
let updated_at = parse_timestamp(&issue.updated_at)?;
@@ -83,17 +83,17 @@ pub fn transform_issue(issue: GitLabIssue) -> Result<IssueWithMetadata, Transfor
gitlab_id: issue.id,
iid: issue.iid,
project_id: issue.project_id,
title: issue.title,
description: issue.description,
state: issue.state,
author_username: issue.author.username,
title: issue.title.clone(),
description: issue.description.clone(),
state: issue.state.clone(),
author_username: issue.author.username.clone(),
created_at,
updated_at,
web_url: issue.web_url,
due_date: issue.due_date,
web_url: issue.web_url.clone(),
due_date: issue.due_date.clone(),
milestone_title,
},
label_names: issue.labels,
label_names: issue.labels.clone(),
assignee_usernames,
milestone,
})
@@ -131,7 +131,7 @@ mod tests {
#[test]
fn transforms_issue_with_all_fields() {
let issue = make_test_issue();
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
assert_eq!(result.issue.gitlab_id, 12345);
assert_eq!(result.issue.iid, 42);
@@ -154,14 +154,14 @@ mod tests {
let mut issue = make_test_issue();
issue.description = None;
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
assert!(result.issue.description.is_none());
}
#[test]
fn extracts_label_names() {
let issue = make_test_issue();
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
assert_eq!(result.label_names.len(), 2);
assert_eq!(result.label_names[0], "bug");
@@ -173,14 +173,14 @@ mod tests {
let mut issue = make_test_issue();
issue.labels = vec![];
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
assert!(result.label_names.is_empty());
}
#[test]
fn parses_timestamps_to_ms_epoch() {
let issue = make_test_issue();
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
// 2024-01-15T10:00:00.000Z = 1705312800000 ms
assert_eq!(result.issue.created_at, 1705312800000);
@@ -194,7 +194,7 @@ mod tests {
// GitLab can return timestamps with timezone offset
issue.created_at = "2024-01-15T05:00:00-05:00".to_string();
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
// 05:00 EST = 10:00 UTC = same as original test
assert_eq!(result.issue.created_at, 1705312800000);
}
@@ -215,7 +215,7 @@ mod tests {
},
];
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
assert_eq!(result.assignee_usernames.len(), 2);
assert_eq!(result.assignee_usernames[0], "alice");
assert_eq!(result.assignee_usernames[1], "bob");
@@ -235,7 +235,7 @@ mod tests {
web_url: Some("https://gitlab.example.com/-/milestones/5".to_string()),
});
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
// Denormalized title on issue for quick display
assert_eq!(result.issue.milestone_title, Some("v1.0".to_string()));
@@ -258,7 +258,7 @@ mod tests {
#[test]
fn handles_missing_milestone() {
let issue = make_test_issue();
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
assert!(result.issue.milestone_title.is_none());
assert!(result.milestone.is_none());
@@ -269,7 +269,7 @@ mod tests {
let mut issue = make_test_issue();
issue.due_date = Some("2024-02-15".to_string());
let result = transform_issue(issue).unwrap();
let result = transform_issue(&issue).unwrap();
assert_eq!(result.issue.due_date, Some("2024-02-15".to_string()));
}
}

View File

@@ -111,14 +111,14 @@ async fn ingest_discussions_for_issue(
result.discussions_fetched += 1;
// Store raw payload
let payload_json = serde_json::to_value(&gitlab_discussion)?;
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?;
let payload_id = store_payload(
conn,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "discussion",
gitlab_id: &gitlab_discussion.id,
payload: &payload_json,
json_bytes: &payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?;
@@ -156,25 +156,10 @@ async fn ingest_discussions_for_issue(
)?;
for note in notes {
// Store raw note payload
let note_payload_json = serde_json::to_value(
gitlab_discussion
.notes
.iter()
.find(|n| n.id == note.gitlab_id),
)?;
let note_payload_id = store_payload(
&tx,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "note",
gitlab_id: &note.gitlab_id.to_string(),
payload: &note_payload_json,
compress: config.storage.compress_raw_payloads,
},
)?;
insert_note(&tx, local_discussion_id, &note, note_payload_id)?;
// Note: per-note raw payload storage is skipped because the discussion
// payload (already stored above) contains all notes. The full note
// content is also stored in the notes table itself.
insert_note(&tx, local_discussion_id, &note, None)?;
}
tx.commit()?;
@@ -246,7 +231,7 @@ fn insert_note(
conn: &Connection,
discussion_id: i64,
note: &crate::gitlab::transformers::NormalizedNote,
payload_id: i64,
payload_id: Option<i64>,
) -> Result<()> {
conn.execute(
"INSERT INTO notes (

View File

@@ -196,8 +196,8 @@ fn process_single_issue(
let now = now_ms();
// Transform issue first (outside transaction - no DB access)
let payload_json = serde_json::to_value(issue)?;
let transformed = transform_issue(issue.clone())?;
let payload_bytes = serde_json::to_vec(issue)?;
let transformed = transform_issue(issue)?;
let issue_row = &transformed.issue;
// Wrap all DB operations in a transaction for atomicity
@@ -207,7 +207,7 @@ fn process_single_issue(
config,
project_id,
issue,
&payload_json,
&payload_bytes,
issue_row,
&transformed.label_names,
&transformed.assignee_usernames,
@@ -226,7 +226,7 @@ fn process_issue_in_transaction(
config: &Config,
project_id: i64,
issue: &GitLabIssue,
payload_json: &serde_json::Value,
payload_bytes: &[u8],
issue_row: &crate::gitlab::transformers::IssueRow,
label_names: &[String],
assignee_usernames: &[String],
@@ -242,7 +242,7 @@ fn process_issue_in_transaction(
project_id: Some(project_id),
resource_type: "issue",
gitlab_id: &issue.id.to_string(),
payload: payload_json,
json_bytes: payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?;
@@ -332,33 +332,27 @@ fn process_issue_in_transaction(
}
/// Upsert a label within a transaction, returning its ID.
/// Uses INSERT...ON CONFLICT...RETURNING for a single round-trip.
fn upsert_label_tx(
tx: &Transaction<'_>,
project_id: i64,
name: &str,
created_count: &mut usize,
) -> Result<i64> {
// Try to get existing
let existing: Option<i64> = tx
.query_row(
"SELECT id FROM labels WHERE project_id = ? AND name = ?",
(project_id, name),
|row| row.get(0),
)
.ok();
let id: i64 = tx.query_row(
"INSERT INTO labels (project_id, name) VALUES (?1, ?2)
ON CONFLICT(project_id, name) DO UPDATE SET name = excluded.name
RETURNING id",
(project_id, name),
|row| row.get(0),
)?;
if let Some(id) = existing {
return Ok(id);
// If the rowid matches last_insert_rowid, this was a new insert
if tx.last_insert_rowid() == id {
*created_count += 1;
}
// Insert new
tx.execute(
"INSERT INTO labels (project_id, name) VALUES (?, ?)",
(project_id, name),
)?;
*created_count += 1;
Ok(tx.last_insert_rowid())
Ok(id)
}
/// Link an issue to a label within a transaction.
@@ -371,12 +365,13 @@ fn link_issue_label_tx(tx: &Transaction<'_>, issue_id: i64, label_id: i64) -> Re
}
/// Upsert a milestone within a transaction, returning its local ID.
/// Uses RETURNING to avoid a separate SELECT round-trip.
fn upsert_milestone_tx(
tx: &Transaction<'_>,
project_id: i64,
milestone: &MilestoneRow,
) -> Result<i64> {
tx.execute(
let local_id: i64 = tx.query_row(
"INSERT INTO milestones (gitlab_id, project_id, iid, title, description, state, due_date, web_url)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
ON CONFLICT(project_id, gitlab_id) DO UPDATE SET
@@ -385,7 +380,8 @@ fn upsert_milestone_tx(
description = excluded.description,
state = excluded.state,
due_date = excluded.due_date,
web_url = excluded.web_url",
web_url = excluded.web_url
RETURNING id",
(
milestone.gitlab_id,
project_id,
@@ -396,12 +392,6 @@ fn upsert_milestone_tx(
&milestone.due_date,
&milestone.web_url,
),
)?;
// Get the local ID (whether inserted or updated)
let local_id: i64 = tx.query_row(
"SELECT id FROM milestones WHERE project_id = ? AND gitlab_id = ?",
(project_id, milestone.gitlab_id),
|row| row.get(0),
)?;

View File

@@ -166,14 +166,14 @@ fn process_single_mr(
mr: &GitLabMergeRequest,
) -> Result<ProcessMrResult> {
// Transform MR first (outside transaction - no DB access)
let payload_json = serde_json::to_value(mr)?;
let payload_bytes = serde_json::to_vec(mr)?;
let transformed = transform_merge_request(mr, project_id)
.map_err(|e| LoreError::Other(format!("MR transform failed: {}", e)))?;
// Wrap all DB operations in a transaction for atomicity
let tx = conn.unchecked_transaction()?;
let result =
process_mr_in_transaction(&tx, config, project_id, mr, &payload_json, &transformed)?;
process_mr_in_transaction(&tx, config, project_id, mr, &payload_bytes, &transformed)?;
tx.commit()?;
Ok(result)
@@ -185,7 +185,7 @@ fn process_mr_in_transaction(
config: &Config,
project_id: i64,
mr: &GitLabMergeRequest,
payload_json: &serde_json::Value,
payload_bytes: &[u8],
transformed: &crate::gitlab::transformers::merge_request::MergeRequestWithMetadata,
) -> Result<ProcessMrResult> {
let mut labels_created = 0;
@@ -199,7 +199,7 @@ fn process_mr_in_transaction(
project_id: Some(project_id),
resource_type: "merge_request",
gitlab_id: &mr.id.to_string(),
payload: payload_json,
json_bytes: payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?;
@@ -315,33 +315,28 @@ fn process_mr_in_transaction(
}
/// Upsert a label within a transaction, returning its ID.
/// Upsert a label within a transaction, returning its ID.
/// Uses INSERT...ON CONFLICT...RETURNING for a single round-trip.
fn upsert_label_tx(
tx: &Transaction<'_>,
project_id: i64,
name: &str,
created_count: &mut usize,
) -> Result<i64> {
// Try to get existing
let existing: Option<i64> = tx
.query_row(
"SELECT id FROM labels WHERE project_id = ? AND name = ?",
(project_id, name),
|row| row.get(0),
)
.ok();
let id: i64 = tx.query_row(
"INSERT INTO labels (project_id, name) VALUES (?1, ?2)
ON CONFLICT(project_id, name) DO UPDATE SET name = excluded.name
RETURNING id",
(project_id, name),
|row| row.get(0),
)?;
if let Some(id) = existing {
return Ok(id);
// If the rowid matches last_insert_rowid, this was a new insert
if tx.last_insert_rowid() == id {
*created_count += 1;
}
// Insert new
tx.execute(
"INSERT INTO labels (project_id, name) VALUES (?, ?)",
(project_id, name),
)?;
*created_count += 1;
Ok(tx.last_insert_rowid())
Ok(id)
}
/// Check if an MR passes the cursor filter (not already processed).
@@ -412,13 +407,14 @@ fn reset_sync_cursor(conn: &Connection, project_id: i64) -> Result<()> {
Ok(())
}
/// Reset discussion watermarks for all MRs in project (for full sync).
/// Reset discussion and resource event watermarks for all MRs in project (for full sync).
fn reset_discussion_watermarks(conn: &Connection, project_id: i64) -> Result<()> {
conn.execute(
"UPDATE merge_requests
SET discussions_synced_for_updated_at = NULL,
discussions_sync_attempts = 0,
discussions_sync_last_error = NULL
discussions_sync_last_error = NULL,
resource_events_synced_for_updated_at = NULL
WHERE project_id = ?",
[project_id],
)?;

View File

@@ -172,14 +172,14 @@ pub fn write_prefetched_mr_discussions(
let tx = conn.unchecked_transaction()?;
// Store raw payload
let payload_json = serde_json::to_value(&disc.raw)?;
let payload_bytes = serde_json::to_vec(&disc.raw)?;
let payload_id = Some(store_payload(
&tx,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "discussion",
gitlab_id: &disc.raw.id,
payload: &payload_json,
json_bytes: &payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?);
@@ -206,14 +206,14 @@ pub fn write_prefetched_mr_discussions(
let note_payload_id = if should_store_payload {
let note_data = disc.raw.notes.iter().find(|n| n.id == note.gitlab_id);
if let Some(note_data) = note_data {
let note_payload_json = serde_json::to_value(note_data)?;
let note_payload_bytes = serde_json::to_vec(note_data)?;
Some(store_payload(
&tx,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "note",
gitlab_id: &note.gitlab_id.to_string(),
payload: &note_payload_json,
json_bytes: &note_payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?)
@@ -388,14 +388,14 @@ async fn ingest_discussions_for_mr(
let tx = conn.unchecked_transaction()?;
// Store raw payload
let payload_json = serde_json::to_value(&gitlab_discussion)?;
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?;
let payload_id = Some(store_payload(
&tx,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "discussion",
gitlab_id: &gitlab_discussion.id,
payload: &payload_json,
json_bytes: &payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?);
@@ -429,14 +429,14 @@ async fn ingest_discussions_for_mr(
.iter()
.find(|n| n.id == note.gitlab_id);
if let Some(note_data) = note_data {
let note_payload_json = serde_json::to_value(note_data)?;
let note_payload_bytes = serde_json::to_vec(note_data)?;
Some(store_payload(
&tx,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "note",
gitlab_id: &note.gitlab_id.to_string(),
payload: &note_payload_json,
json_bytes: &note_payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?)