feat(ingestion): Mark entities dirty on ingest for document regeneration
Integrates the dirty tracking system into all four ingestion paths (issues, MRs, issue discussions, MR discussions). After each entity is upserted within its transaction, a corresponding dirty_queue entry is inserted so the document regenerator knows which documents need rebuilding. This ensures that document generation stays transactionally consistent with data changes: if the ingest transaction rolls back, the dirty marker rolls back too, preventing stale document regeneration attempts. Also updates GiError references to LoreError in these files as part of the codebase-wide rename, and adjusts issue discussion logging from info to debug level to reduce noise during normal sync runs. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -8,11 +8,13 @@
|
|||||||
|
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use rusqlite::Connection;
|
use rusqlite::Connection;
|
||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, warn};
|
||||||
|
|
||||||
use crate::Config;
|
use crate::Config;
|
||||||
use crate::core::error::Result;
|
use crate::core::error::Result;
|
||||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||||
|
use crate::documents::SourceType;
|
||||||
|
use crate::ingestion::dirty_tracker;
|
||||||
use crate::gitlab::GitLabClient;
|
use crate::gitlab::GitLabClient;
|
||||||
use crate::gitlab::transformers::{NoteableRef, transform_discussion, transform_notes};
|
use crate::gitlab::transformers::{NoteableRef, transform_discussion, transform_notes};
|
||||||
|
|
||||||
@@ -55,7 +57,7 @@ pub async fn ingest_issue_discussions(
|
|||||||
total_result.stale_discussions_removed += result.stale_discussions_removed;
|
total_result.stale_discussions_removed += result.stale_discussions_removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
issues_processed = issues.len(),
|
issues_processed = issues.len(),
|
||||||
discussions_fetched = total_result.discussions_fetched,
|
discussions_fetched = total_result.discussions_fetched,
|
||||||
discussions_upserted = total_result.discussions_upserted,
|
discussions_upserted = total_result.discussions_upserted,
|
||||||
@@ -90,7 +92,7 @@ async fn ingest_discussions_for_issue(
|
|||||||
// Track discussions we've seen for stale removal
|
// Track discussions we've seen for stale removal
|
||||||
let mut seen_discussion_ids: Vec<String> = Vec::new();
|
let mut seen_discussion_ids: Vec<String> = Vec::new();
|
||||||
// Track if any error occurred during pagination
|
// Track if any error occurred during pagination
|
||||||
let mut pagination_error: Option<crate::core::error::GiError> = None;
|
let mut pagination_error: Option<crate::core::error::LoreError> = None;
|
||||||
|
|
||||||
while let Some(disc_result) = discussions_stream.next().await {
|
while let Some(disc_result) = discussions_stream.next().await {
|
||||||
|
|
||||||
@@ -141,6 +143,9 @@ async fn ingest_discussions_for_issue(
|
|||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Mark dirty for document regeneration (inside transaction)
|
||||||
|
dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?;
|
||||||
|
|
||||||
// Transform and store notes
|
// Transform and store notes
|
||||||
let notes = transform_notes(&gitlab_discussion, local_project_id);
|
let notes = transform_notes(&gitlab_discussion, local_project_id);
|
||||||
let notes_count = notes.len();
|
let notes_count = notes.len();
|
||||||
|
|||||||
@@ -14,9 +14,11 @@ use rusqlite::{Connection, Transaction};
|
|||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::Config;
|
use crate::Config;
|
||||||
use crate::core::error::{GiError, Result};
|
use crate::core::error::{LoreError, Result};
|
||||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||||
use crate::core::time::now_ms;
|
use crate::core::time::now_ms;
|
||||||
|
use crate::documents::SourceType;
|
||||||
|
use crate::ingestion::dirty_tracker;
|
||||||
use crate::gitlab::GitLabClient;
|
use crate::gitlab::GitLabClient;
|
||||||
use crate::gitlab::transformers::{MilestoneRow, transform_issue};
|
use crate::gitlab::transformers::{MilestoneRow, transform_issue};
|
||||||
use crate::gitlab::types::GitLabIssue;
|
use crate::gitlab::types::GitLabIssue;
|
||||||
@@ -297,6 +299,9 @@ fn process_issue_in_transaction(
|
|||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Mark dirty for document regeneration (inside transaction)
|
||||||
|
dirty_tracker::mark_dirty_tx(tx, SourceType::Issue, local_issue_id)?;
|
||||||
|
|
||||||
// Clear existing label links (stale removal)
|
// Clear existing label links (stale removal)
|
||||||
tx.execute(
|
tx.execute(
|
||||||
"DELETE FROM issue_labels WHERE issue_id = ?",
|
"DELETE FROM issue_labels WHERE issue_id = ?",
|
||||||
@@ -470,7 +475,7 @@ fn get_issues_needing_discussion_sync(
|
|||||||
fn parse_timestamp(ts: &str) -> Result<i64> {
|
fn parse_timestamp(ts: &str) -> Result<i64> {
|
||||||
chrono::DateTime::parse_from_rfc3339(ts)
|
chrono::DateTime::parse_from_rfc3339(ts)
|
||||||
.map(|dt| dt.timestamp_millis())
|
.map(|dt| dt.timestamp_millis())
|
||||||
.map_err(|e| GiError::Other(format!("Failed to parse timestamp '{}': {}", ts, e)))
|
.map_err(|e| LoreError::Other(format!("Failed to parse timestamp '{}': {}", ts, e)))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -13,9 +13,11 @@ use rusqlite::{Connection, Transaction, params};
|
|||||||
use tracing::{debug, info, warn};
|
use tracing::{debug, info, warn};
|
||||||
|
|
||||||
use crate::Config;
|
use crate::Config;
|
||||||
use crate::core::error::{GiError, Result};
|
use crate::core::error::{LoreError, Result};
|
||||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||||
use crate::core::time::now_ms;
|
use crate::core::time::now_ms;
|
||||||
|
use crate::documents::SourceType;
|
||||||
|
use crate::ingestion::dirty_tracker;
|
||||||
use crate::gitlab::GitLabClient;
|
use crate::gitlab::GitLabClient;
|
||||||
use crate::gitlab::transformers::merge_request::transform_merge_request;
|
use crate::gitlab::transformers::merge_request::transform_merge_request;
|
||||||
use crate::gitlab::types::GitLabMergeRequest;
|
use crate::gitlab::types::GitLabMergeRequest;
|
||||||
@@ -166,7 +168,7 @@ fn process_single_mr(
|
|||||||
// Transform MR first (outside transaction - no DB access)
|
// Transform MR first (outside transaction - no DB access)
|
||||||
let payload_json = serde_json::to_value(mr)?;
|
let payload_json = serde_json::to_value(mr)?;
|
||||||
let transformed = transform_merge_request(mr, project_id)
|
let transformed = transform_merge_request(mr, project_id)
|
||||||
.map_err(|e| GiError::Other(format!("MR transform failed: {}", e)))?;
|
.map_err(|e| LoreError::Other(format!("MR transform failed: {}", e)))?;
|
||||||
|
|
||||||
// Wrap all DB operations in a transaction for atomicity
|
// Wrap all DB operations in a transaction for atomicity
|
||||||
let tx = conn.unchecked_transaction()?;
|
let tx = conn.unchecked_transaction()?;
|
||||||
@@ -263,6 +265,9 @@ fn process_mr_in_transaction(
|
|||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Mark dirty for document regeneration (inside transaction)
|
||||||
|
dirty_tracker::mark_dirty_tx(tx, SourceType::MergeRequest, local_mr_id)?;
|
||||||
|
|
||||||
// Clear-and-relink labels
|
// Clear-and-relink labels
|
||||||
tx.execute(
|
tx.execute(
|
||||||
"DELETE FROM mr_labels WHERE merge_request_id = ?",
|
"DELETE FROM mr_labels WHERE merge_request_id = ?",
|
||||||
@@ -448,7 +453,7 @@ pub fn get_mrs_needing_discussion_sync(
|
|||||||
fn parse_timestamp(ts: &str) -> Result<i64> {
|
fn parse_timestamp(ts: &str) -> Result<i64> {
|
||||||
chrono::DateTime::parse_from_rfc3339(ts)
|
chrono::DateTime::parse_from_rfc3339(ts)
|
||||||
.map(|dt| dt.timestamp_millis())
|
.map(|dt| dt.timestamp_millis())
|
||||||
.map_err(|e| GiError::Other(format!("Failed to parse timestamp '{}': {}", ts, e)))
|
.map_err(|e| LoreError::Other(format!("Failed to parse timestamp '{}': {}", ts, e)))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -18,6 +18,8 @@ use crate::Config;
|
|||||||
use crate::core::error::Result;
|
use crate::core::error::Result;
|
||||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||||
use crate::core::time::now_ms;
|
use crate::core::time::now_ms;
|
||||||
|
use crate::documents::SourceType;
|
||||||
|
use crate::ingestion::dirty_tracker;
|
||||||
use crate::gitlab::GitLabClient;
|
use crate::gitlab::GitLabClient;
|
||||||
use crate::gitlab::transformers::{
|
use crate::gitlab::transformers::{
|
||||||
NormalizedDiscussion, NormalizedNote, transform_mr_discussion,
|
NormalizedDiscussion, NormalizedNote, transform_mr_discussion,
|
||||||
@@ -189,6 +191,9 @@ pub fn write_prefetched_mr_discussions(
|
|||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Mark dirty for document regeneration (inside transaction)
|
||||||
|
dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?;
|
||||||
|
|
||||||
// Upsert notes
|
// Upsert notes
|
||||||
for note in &disc.notes {
|
for note in &disc.notes {
|
||||||
let should_store_payload = !note.is_system
|
let should_store_payload = !note.is_system
|
||||||
@@ -402,6 +407,9 @@ async fn ingest_discussions_for_mr(
|
|||||||
|row| row.get(0),
|
|row| row.get(0),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
// Mark dirty for document regeneration (inside transaction)
|
||||||
|
dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?;
|
||||||
|
|
||||||
// Upsert notes (not delete-all-then-insert)
|
// Upsert notes (not delete-all-then-insert)
|
||||||
for note in ¬es {
|
for note in ¬es {
|
||||||
// Selective payload storage: skip system notes without position
|
// Selective payload storage: skip system notes without position
|
||||||
|
|||||||
Reference in New Issue
Block a user