- Add SourceType::Note with extract_note_document() and ParentMetadataCache - Migration 022: composite indexes for notes queries + author_id column - Migration 024: table rebuild adding 'note' to CHECK constraints, defense triggers - Migration 025: backfill existing non-system notes into dirty queue - Add lore notes CLI command with 17 filter options (author, path, resolution, etc.) - Support table/json/jsonl/csv output formats with field selection - Wire note dirty tracking through discussion and MR discussion ingestion - Fix test_migration_024_preserves_existing_data off-by-one (tested wrong migration) - Fix upsert_document_inner returning false for label/path-only changes
907 lines
31 KiB
Rust
907 lines
31 KiB
Rust
use futures::StreamExt;
|
|
use rusqlite::{Connection, params};
|
|
use tracing::{debug, info, warn};
|
|
|
|
use crate::Config;
|
|
use crate::core::error::Result;
|
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
|
use crate::core::time::now_ms;
|
|
use crate::documents::SourceType;
|
|
use crate::gitlab::GitLabClient;
|
|
use crate::gitlab::transformers::{
|
|
NormalizedDiscussion, NormalizedNote, transform_mr_discussion,
|
|
transform_notes_with_diff_position,
|
|
};
|
|
use crate::gitlab::types::GitLabDiscussion;
|
|
use crate::ingestion::dirty_tracker;
|
|
use crate::ingestion::discussions::NoteUpsertOutcome;
|
|
|
|
use super::merge_requests::MrForDiscussionSync;
|
|
|
|
#[derive(Debug, Default)]
|
|
pub struct IngestMrDiscussionsResult {
|
|
pub discussions_fetched: usize,
|
|
pub discussions_upserted: usize,
|
|
pub notes_upserted: usize,
|
|
pub notes_skipped_bad_timestamp: usize,
|
|
pub diffnotes_count: usize,
|
|
pub pagination_succeeded: bool,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct PrefetchedMrDiscussions {
|
|
pub mr: MrForDiscussionSync,
|
|
pub discussions: Vec<PrefetchedDiscussion>,
|
|
pub fetch_error: Option<String>,
|
|
pub had_transform_errors: bool,
|
|
pub notes_skipped_count: usize,
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
pub struct PrefetchedDiscussion {
|
|
pub raw: GitLabDiscussion,
|
|
pub normalized: NormalizedDiscussion,
|
|
pub notes: Vec<NormalizedNote>,
|
|
}
|
|
|
|
pub async fn prefetch_mr_discussions(
|
|
client: &GitLabClient,
|
|
gitlab_project_id: i64,
|
|
local_project_id: i64,
|
|
mr: MrForDiscussionSync,
|
|
) -> PrefetchedMrDiscussions {
|
|
debug!(mr_iid = mr.iid, "Prefetching discussions for MR");
|
|
|
|
let raw_discussions = match client
|
|
.fetch_all_mr_discussions(gitlab_project_id, mr.iid)
|
|
.await
|
|
{
|
|
Ok(d) => d,
|
|
Err(e) => {
|
|
return PrefetchedMrDiscussions {
|
|
mr,
|
|
discussions: Vec::new(),
|
|
fetch_error: Some(e.to_string()),
|
|
had_transform_errors: false,
|
|
notes_skipped_count: 0,
|
|
};
|
|
}
|
|
};
|
|
|
|
let mut discussions = Vec::with_capacity(raw_discussions.len());
|
|
let mut had_transform_errors = false;
|
|
let mut notes_skipped_count = 0;
|
|
|
|
for raw in raw_discussions {
|
|
let notes = match transform_notes_with_diff_position(&raw, local_project_id) {
|
|
Ok(n) => n,
|
|
Err(e) => {
|
|
warn!(
|
|
mr_iid = mr.iid,
|
|
discussion_id = %raw.id,
|
|
error = %e,
|
|
"Note transform failed during prefetch"
|
|
);
|
|
had_transform_errors = true;
|
|
notes_skipped_count += raw.notes.len();
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let normalized = transform_mr_discussion(&raw, local_project_id, mr.local_mr_id);
|
|
|
|
discussions.push(PrefetchedDiscussion {
|
|
raw,
|
|
normalized,
|
|
notes,
|
|
});
|
|
}
|
|
|
|
PrefetchedMrDiscussions {
|
|
mr,
|
|
discussions,
|
|
fetch_error: None,
|
|
had_transform_errors,
|
|
notes_skipped_count,
|
|
}
|
|
}
|
|
|
|
pub fn write_prefetched_mr_discussions(
|
|
conn: &Connection,
|
|
config: &Config,
|
|
local_project_id: i64,
|
|
prefetched: PrefetchedMrDiscussions,
|
|
) -> Result<IngestMrDiscussionsResult> {
|
|
let sync_succeeded = prefetched.fetch_error.is_none() && !prefetched.had_transform_errors;
|
|
|
|
let mut result = IngestMrDiscussionsResult {
|
|
pagination_succeeded: sync_succeeded,
|
|
notes_skipped_bad_timestamp: prefetched.notes_skipped_count,
|
|
..Default::default()
|
|
};
|
|
|
|
let mr = &prefetched.mr;
|
|
|
|
if let Some(error) = &prefetched.fetch_error {
|
|
warn!(mr_iid = mr.iid, error = %error, "Prefetch failed for MR");
|
|
record_sync_health_error(conn, mr.local_mr_id, error)?;
|
|
return Ok(result);
|
|
}
|
|
|
|
let run_seen_at = now_ms();
|
|
|
|
for disc in &prefetched.discussions {
|
|
let diffnotes_in_disc = disc
|
|
.notes
|
|
.iter()
|
|
.filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some())
|
|
.count();
|
|
let notes_in_disc = disc.notes.len();
|
|
|
|
let tx = conn.unchecked_transaction()?;
|
|
|
|
let payload_bytes = serde_json::to_vec(&disc.raw)?;
|
|
let payload_id = Some(store_payload(
|
|
&tx,
|
|
StorePayloadOptions {
|
|
project_id: Some(local_project_id),
|
|
resource_type: "discussion",
|
|
gitlab_id: &disc.raw.id,
|
|
json_bytes: &payload_bytes,
|
|
compress: config.storage.compress_raw_payloads,
|
|
},
|
|
)?);
|
|
|
|
upsert_discussion(&tx, &disc.normalized, run_seen_at, payload_id)?;
|
|
|
|
let local_discussion_id: i64 = tx.query_row(
|
|
"SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
|
|
params![local_project_id, &disc.normalized.gitlab_discussion_id],
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?;
|
|
|
|
// Mark child note documents dirty (they inherit parent metadata)
|
|
tx.execute(
|
|
"INSERT INTO dirty_sources (source_type, source_id, queued_at)
|
|
SELECT 'note', n.id, ?1
|
|
FROM notes n
|
|
WHERE n.discussion_id = ?2 AND n.is_system = 0
|
|
ON CONFLICT(source_type, source_id) DO UPDATE SET queued_at = excluded.queued_at, attempt_count = 0",
|
|
params![now_ms(), local_discussion_id],
|
|
)?;
|
|
|
|
for note in &disc.notes {
|
|
let should_store_payload = !note.is_system
|
|
|| note.position_new_path.is_some()
|
|
|| note.position_old_path.is_some();
|
|
|
|
let note_payload_id = if should_store_payload {
|
|
let note_data = disc.raw.notes.iter().find(|n| n.id == note.gitlab_id);
|
|
if let Some(note_data) = note_data {
|
|
let note_payload_bytes = serde_json::to_vec(note_data)?;
|
|
Some(store_payload(
|
|
&tx,
|
|
StorePayloadOptions {
|
|
project_id: Some(local_project_id),
|
|
resource_type: "note",
|
|
gitlab_id: ¬e.gitlab_id.to_string(),
|
|
json_bytes: ¬e_payload_bytes,
|
|
compress: config.storage.compress_raw_payloads,
|
|
},
|
|
)?)
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let outcome =
|
|
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
|
|
if !note.is_system && outcome.changed_semantics {
|
|
dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?;
|
|
}
|
|
}
|
|
|
|
tx.commit()?;
|
|
|
|
result.discussions_fetched += 1;
|
|
result.discussions_upserted += 1;
|
|
result.notes_upserted += notes_in_disc;
|
|
result.diffnotes_count += diffnotes_in_disc;
|
|
}
|
|
|
|
if sync_succeeded {
|
|
sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?;
|
|
sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?;
|
|
mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?;
|
|
clear_sync_health_error(conn, mr.local_mr_id)?;
|
|
|
|
debug!(
|
|
mr_iid = mr.iid,
|
|
"MR discussion sync complete, watermark advanced"
|
|
);
|
|
} else if prefetched.had_transform_errors {
|
|
warn!(
|
|
mr_iid = mr.iid,
|
|
notes_skipped = prefetched.notes_skipped_count,
|
|
"Transform errors occurred; watermark NOT advanced to preserve data"
|
|
);
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
pub async fn ingest_mr_discussions(
|
|
conn: &Connection,
|
|
client: &GitLabClient,
|
|
config: &Config,
|
|
gitlab_project_id: i64,
|
|
local_project_id: i64,
|
|
mrs: &[MrForDiscussionSync],
|
|
) -> Result<IngestMrDiscussionsResult> {
|
|
let mut total_result = IngestMrDiscussionsResult {
|
|
pagination_succeeded: true,
|
|
..Default::default()
|
|
};
|
|
|
|
for mr in mrs {
|
|
let result = ingest_discussions_for_mr(
|
|
conn,
|
|
client,
|
|
config,
|
|
gitlab_project_id,
|
|
local_project_id,
|
|
mr,
|
|
)
|
|
.await?;
|
|
|
|
total_result.discussions_fetched += result.discussions_fetched;
|
|
total_result.discussions_upserted += result.discussions_upserted;
|
|
total_result.notes_upserted += result.notes_upserted;
|
|
total_result.notes_skipped_bad_timestamp += result.notes_skipped_bad_timestamp;
|
|
total_result.diffnotes_count += result.diffnotes_count;
|
|
if !result.pagination_succeeded {
|
|
total_result.pagination_succeeded = false;
|
|
}
|
|
}
|
|
|
|
info!(
|
|
mrs_processed = mrs.len(),
|
|
discussions_fetched = total_result.discussions_fetched,
|
|
discussions_upserted = total_result.discussions_upserted,
|
|
notes_upserted = total_result.notes_upserted,
|
|
notes_skipped = total_result.notes_skipped_bad_timestamp,
|
|
diffnotes = total_result.diffnotes_count,
|
|
pagination_succeeded = total_result.pagination_succeeded,
|
|
"MR discussion ingestion complete"
|
|
);
|
|
|
|
Ok(total_result)
|
|
}
|
|
|
|
async fn ingest_discussions_for_mr(
|
|
conn: &Connection,
|
|
client: &GitLabClient,
|
|
config: &Config,
|
|
gitlab_project_id: i64,
|
|
local_project_id: i64,
|
|
mr: &MrForDiscussionSync,
|
|
) -> Result<IngestMrDiscussionsResult> {
|
|
let mut result = IngestMrDiscussionsResult {
|
|
pagination_succeeded: true,
|
|
..Default::default()
|
|
};
|
|
|
|
debug!(
|
|
mr_iid = mr.iid,
|
|
local_mr_id = mr.local_mr_id,
|
|
"Fetching discussions for MR"
|
|
);
|
|
|
|
let run_seen_at = now_ms();
|
|
|
|
let mut discussions_stream = client.paginate_mr_discussions(gitlab_project_id, mr.iid);
|
|
|
|
let mut received_first_response = false;
|
|
|
|
while let Some(disc_result) = discussions_stream.next().await {
|
|
if !received_first_response {
|
|
received_first_response = true;
|
|
}
|
|
|
|
let gitlab_discussion = match disc_result {
|
|
Ok(d) => d,
|
|
Err(e) => {
|
|
warn!(
|
|
mr_iid = mr.iid,
|
|
error = %e,
|
|
"Error during MR discussion pagination"
|
|
);
|
|
result.pagination_succeeded = false;
|
|
record_sync_health_error(conn, mr.local_mr_id, &e.to_string())?;
|
|
break;
|
|
}
|
|
};
|
|
let notes = match transform_notes_with_diff_position(&gitlab_discussion, local_project_id) {
|
|
Ok(notes) => notes,
|
|
Err(e) => {
|
|
warn!(
|
|
mr_iid = mr.iid,
|
|
discussion_id = %gitlab_discussion.id,
|
|
error = %e,
|
|
"Note transform failed; preserving existing notes"
|
|
);
|
|
result.notes_skipped_bad_timestamp += gitlab_discussion.notes.len();
|
|
result.pagination_succeeded = false;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let diffnotes_in_disc = notes
|
|
.iter()
|
|
.filter(|n| n.position_new_path.is_some() || n.position_old_path.is_some())
|
|
.count();
|
|
let notes_count = notes.len();
|
|
|
|
let normalized_discussion =
|
|
transform_mr_discussion(&gitlab_discussion, local_project_id, mr.local_mr_id);
|
|
|
|
let tx = conn.unchecked_transaction()?;
|
|
|
|
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?;
|
|
let payload_id = Some(store_payload(
|
|
&tx,
|
|
StorePayloadOptions {
|
|
project_id: Some(local_project_id),
|
|
resource_type: "discussion",
|
|
gitlab_id: &gitlab_discussion.id,
|
|
json_bytes: &payload_bytes,
|
|
compress: config.storage.compress_raw_payloads,
|
|
},
|
|
)?);
|
|
|
|
upsert_discussion(&tx, &normalized_discussion, run_seen_at, payload_id)?;
|
|
|
|
let local_discussion_id: i64 = tx.query_row(
|
|
"SELECT id FROM discussions WHERE project_id = ? AND gitlab_discussion_id = ?",
|
|
params![
|
|
local_project_id,
|
|
&normalized_discussion.gitlab_discussion_id
|
|
],
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
dirty_tracker::mark_dirty_tx(&tx, SourceType::Discussion, local_discussion_id)?;
|
|
|
|
// Mark child note documents dirty (they inherit parent metadata)
|
|
tx.execute(
|
|
"INSERT INTO dirty_sources (source_type, source_id, queued_at)
|
|
SELECT 'note', n.id, ?1
|
|
FROM notes n
|
|
WHERE n.discussion_id = ?2 AND n.is_system = 0
|
|
ON CONFLICT(source_type, source_id) DO UPDATE SET queued_at = excluded.queued_at, attempt_count = 0",
|
|
params![now_ms(), local_discussion_id],
|
|
)?;
|
|
|
|
for note in ¬es {
|
|
let should_store_payload = !note.is_system
|
|
|| note.position_new_path.is_some()
|
|
|| note.position_old_path.is_some();
|
|
|
|
let note_payload_id = if should_store_payload {
|
|
let note_data = gitlab_discussion
|
|
.notes
|
|
.iter()
|
|
.find(|n| n.id == note.gitlab_id);
|
|
if let Some(note_data) = note_data {
|
|
let note_payload_bytes = serde_json::to_vec(note_data)?;
|
|
Some(store_payload(
|
|
&tx,
|
|
StorePayloadOptions {
|
|
project_id: Some(local_project_id),
|
|
resource_type: "note",
|
|
gitlab_id: ¬e.gitlab_id.to_string(),
|
|
json_bytes: ¬e_payload_bytes,
|
|
compress: config.storage.compress_raw_payloads,
|
|
},
|
|
)?)
|
|
} else {
|
|
None
|
|
}
|
|
} else {
|
|
None
|
|
};
|
|
|
|
let outcome =
|
|
upsert_note(&tx, local_discussion_id, note, run_seen_at, note_payload_id)?;
|
|
if !note.is_system && outcome.changed_semantics {
|
|
dirty_tracker::mark_dirty_tx(&tx, SourceType::Note, outcome.local_note_id)?;
|
|
}
|
|
}
|
|
|
|
tx.commit()?;
|
|
|
|
result.discussions_fetched += 1;
|
|
result.discussions_upserted += 1;
|
|
result.notes_upserted += notes_count;
|
|
result.diffnotes_count += diffnotes_in_disc;
|
|
}
|
|
|
|
if result.pagination_succeeded && received_first_response {
|
|
sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?;
|
|
|
|
sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?;
|
|
|
|
mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?;
|
|
clear_sync_health_error(conn, mr.local_mr_id)?;
|
|
|
|
debug!(
|
|
mr_iid = mr.iid,
|
|
"MR discussion sync complete, watermark advanced"
|
|
);
|
|
} else if result.pagination_succeeded && !received_first_response {
|
|
sweep_stale_discussions(conn, mr.local_mr_id, run_seen_at)?;
|
|
sweep_stale_notes(conn, local_project_id, mr.local_mr_id, run_seen_at)?;
|
|
mark_discussions_synced(conn, mr.local_mr_id, mr.updated_at)?;
|
|
clear_sync_health_error(conn, mr.local_mr_id)?;
|
|
} else {
|
|
warn!(
|
|
mr_iid = mr.iid,
|
|
discussions_seen = result.discussions_upserted,
|
|
notes_skipped = result.notes_skipped_bad_timestamp,
|
|
"Watermark NOT advanced; will retry on next sync"
|
|
);
|
|
}
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
fn upsert_discussion(
|
|
conn: &Connection,
|
|
discussion: &crate::gitlab::transformers::NormalizedDiscussion,
|
|
last_seen_at: i64,
|
|
payload_id: Option<i64>,
|
|
) -> Result<()> {
|
|
conn.execute(
|
|
"INSERT INTO discussions (
|
|
gitlab_discussion_id, project_id, issue_id, merge_request_id, noteable_type,
|
|
individual_note, first_note_at, last_note_at, last_seen_at,
|
|
resolvable, resolved, raw_payload_id
|
|
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
|
|
ON CONFLICT(project_id, gitlab_discussion_id) DO UPDATE SET
|
|
first_note_at = excluded.first_note_at,
|
|
last_note_at = excluded.last_note_at,
|
|
last_seen_at = excluded.last_seen_at,
|
|
resolvable = excluded.resolvable,
|
|
resolved = excluded.resolved,
|
|
raw_payload_id = COALESCE(excluded.raw_payload_id, raw_payload_id)",
|
|
params![
|
|
&discussion.gitlab_discussion_id,
|
|
discussion.project_id,
|
|
discussion.issue_id,
|
|
discussion.merge_request_id,
|
|
&discussion.noteable_type,
|
|
discussion.individual_note,
|
|
discussion.first_note_at,
|
|
discussion.last_note_at,
|
|
last_seen_at,
|
|
discussion.resolvable,
|
|
discussion.resolved,
|
|
payload_id,
|
|
],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn upsert_note(
|
|
conn: &Connection,
|
|
discussion_id: i64,
|
|
note: &NormalizedNote,
|
|
last_seen_at: i64,
|
|
payload_id: Option<i64>,
|
|
) -> Result<NoteUpsertOutcome> {
|
|
// Pre-read for semantic change detection
|
|
let existing = conn
|
|
.query_row(
|
|
"SELECT id, body, note_type, resolved, resolved_by,
|
|
position_old_path, position_new_path, position_old_line, position_new_line,
|
|
position_type, position_line_range_start, position_line_range_end,
|
|
position_base_sha, position_start_sha, position_head_sha
|
|
FROM notes WHERE gitlab_id = ?",
|
|
params![note.gitlab_id],
|
|
|row| {
|
|
Ok((
|
|
row.get::<_, i64>(0)?,
|
|
row.get::<_, String>(1)?,
|
|
row.get::<_, Option<String>>(2)?,
|
|
row.get::<_, bool>(3)?,
|
|
row.get::<_, Option<String>>(4)?,
|
|
row.get::<_, Option<String>>(5)?,
|
|
row.get::<_, Option<String>>(6)?,
|
|
row.get::<_, Option<i32>>(7)?,
|
|
row.get::<_, Option<i32>>(8)?,
|
|
row.get::<_, Option<String>>(9)?,
|
|
row.get::<_, Option<i32>>(10)?,
|
|
row.get::<_, Option<i32>>(11)?,
|
|
row.get::<_, Option<String>>(12)?,
|
|
row.get::<_, Option<String>>(13)?,
|
|
row.get::<_, Option<String>>(14)?,
|
|
))
|
|
},
|
|
)
|
|
.ok();
|
|
|
|
let changed_semantics = match &existing {
|
|
Some((
|
|
_id,
|
|
body,
|
|
note_type,
|
|
resolved,
|
|
resolved_by,
|
|
pos_old_path,
|
|
pos_new_path,
|
|
pos_old_line,
|
|
pos_new_line,
|
|
pos_type,
|
|
pos_range_start,
|
|
pos_range_end,
|
|
pos_base_sha,
|
|
pos_start_sha,
|
|
pos_head_sha,
|
|
)) => {
|
|
*body != note.body
|
|
|| *note_type != note.note_type
|
|
|| *resolved != note.resolved
|
|
|| *resolved_by != note.resolved_by
|
|
|| *pos_old_path != note.position_old_path
|
|
|| *pos_new_path != note.position_new_path
|
|
|| *pos_old_line != note.position_old_line
|
|
|| *pos_new_line != note.position_new_line
|
|
|| *pos_type != note.position_type
|
|
|| *pos_range_start != note.position_line_range_start
|
|
|| *pos_range_end != note.position_line_range_end
|
|
|| *pos_base_sha != note.position_base_sha
|
|
|| *pos_start_sha != note.position_start_sha
|
|
|| *pos_head_sha != note.position_head_sha
|
|
}
|
|
None => true,
|
|
};
|
|
|
|
conn.execute(
|
|
"INSERT INTO notes (
|
|
gitlab_id, discussion_id, project_id, note_type, is_system,
|
|
author_id, author_username, body, created_at, updated_at, last_seen_at,
|
|
position, resolvable, resolved, resolved_by, resolved_at,
|
|
position_old_path, position_new_path, position_old_line, position_new_line,
|
|
position_type, position_line_range_start, position_line_range_end,
|
|
position_base_sha, position_start_sha, position_head_sha,
|
|
raw_payload_id
|
|
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?19, ?20, ?21, ?22, ?23, ?24, ?25, ?26, ?27)
|
|
ON CONFLICT(gitlab_id) DO UPDATE SET
|
|
note_type = excluded.note_type,
|
|
author_id = excluded.author_id,
|
|
body = excluded.body,
|
|
updated_at = excluded.updated_at,
|
|
last_seen_at = excluded.last_seen_at,
|
|
resolvable = excluded.resolvable,
|
|
resolved = excluded.resolved,
|
|
resolved_by = excluded.resolved_by,
|
|
resolved_at = excluded.resolved_at,
|
|
position_old_path = excluded.position_old_path,
|
|
position_new_path = excluded.position_new_path,
|
|
position_old_line = excluded.position_old_line,
|
|
position_new_line = excluded.position_new_line,
|
|
position_type = excluded.position_type,
|
|
position_line_range_start = excluded.position_line_range_start,
|
|
position_line_range_end = excluded.position_line_range_end,
|
|
position_base_sha = excluded.position_base_sha,
|
|
position_start_sha = excluded.position_start_sha,
|
|
position_head_sha = excluded.position_head_sha,
|
|
raw_payload_id = COALESCE(excluded.raw_payload_id, raw_payload_id)",
|
|
params![
|
|
note.gitlab_id,
|
|
discussion_id,
|
|
note.project_id,
|
|
¬e.note_type,
|
|
note.is_system,
|
|
note.author_id,
|
|
¬e.author_username,
|
|
¬e.body,
|
|
note.created_at,
|
|
note.updated_at,
|
|
last_seen_at,
|
|
note.position,
|
|
note.resolvable,
|
|
note.resolved,
|
|
¬e.resolved_by,
|
|
note.resolved_at,
|
|
¬e.position_old_path,
|
|
¬e.position_new_path,
|
|
note.position_old_line,
|
|
note.position_new_line,
|
|
¬e.position_type,
|
|
note.position_line_range_start,
|
|
note.position_line_range_end,
|
|
¬e.position_base_sha,
|
|
¬e.position_start_sha,
|
|
¬e.position_head_sha,
|
|
payload_id,
|
|
],
|
|
)?;
|
|
|
|
let local_note_id: i64 = conn.query_row(
|
|
"SELECT id FROM notes WHERE gitlab_id = ?",
|
|
params![note.gitlab_id],
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
Ok(NoteUpsertOutcome {
|
|
local_note_id,
|
|
changed_semantics,
|
|
})
|
|
}
|
|
|
|
fn sweep_stale_discussions(conn: &Connection, local_mr_id: i64, run_seen_at: i64) -> Result<usize> {
|
|
let deleted = conn.execute(
|
|
"DELETE FROM discussions
|
|
WHERE merge_request_id = ? AND last_seen_at < ?",
|
|
params![local_mr_id, run_seen_at],
|
|
)?;
|
|
if deleted > 0 {
|
|
debug!(local_mr_id, deleted, "Swept stale discussions");
|
|
}
|
|
Ok(deleted)
|
|
}
|
|
|
|
fn sweep_stale_notes(
|
|
conn: &Connection,
|
|
local_project_id: i64,
|
|
local_mr_id: i64,
|
|
run_seen_at: i64,
|
|
) -> Result<usize> {
|
|
// Step 1: Delete note documents for stale notes
|
|
conn.execute(
|
|
"DELETE FROM documents WHERE source_type = 'note' AND source_id IN
|
|
(SELECT id FROM notes
|
|
WHERE project_id = ?1
|
|
AND discussion_id IN (SELECT id FROM discussions WHERE merge_request_id = ?2)
|
|
AND last_seen_at < ?3
|
|
AND is_system = 0)",
|
|
params![local_project_id, local_mr_id, run_seen_at],
|
|
)?;
|
|
|
|
// Step 2: Delete dirty_sources entries for stale notes
|
|
conn.execute(
|
|
"DELETE FROM dirty_sources WHERE source_type = 'note' AND source_id IN
|
|
(SELECT id FROM notes
|
|
WHERE project_id = ?1
|
|
AND discussion_id IN (SELECT id FROM discussions WHERE merge_request_id = ?2)
|
|
AND last_seen_at < ?3
|
|
AND is_system = 0)",
|
|
params![local_project_id, local_mr_id, run_seen_at],
|
|
)?;
|
|
|
|
// Step 3: Delete the stale notes themselves
|
|
let deleted = conn.execute(
|
|
"DELETE FROM notes
|
|
WHERE project_id = ?1
|
|
AND discussion_id IN (
|
|
SELECT id FROM discussions WHERE merge_request_id = ?2
|
|
)
|
|
AND last_seen_at < ?3",
|
|
params![local_project_id, local_mr_id, run_seen_at],
|
|
)?;
|
|
if deleted > 0 {
|
|
debug!(local_mr_id, deleted, "Swept stale notes");
|
|
}
|
|
Ok(deleted)
|
|
}
|
|
|
|
fn mark_discussions_synced(conn: &Connection, local_mr_id: i64, updated_at: i64) -> Result<()> {
|
|
conn.execute(
|
|
"UPDATE merge_requests SET discussions_synced_for_updated_at = ? WHERE id = ?",
|
|
params![updated_at, local_mr_id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn record_sync_health_error(conn: &Connection, local_mr_id: i64, error: &str) -> Result<()> {
|
|
conn.execute(
|
|
"UPDATE merge_requests SET
|
|
discussions_sync_last_attempt_at = ?,
|
|
discussions_sync_attempts = discussions_sync_attempts + 1,
|
|
discussions_sync_last_error = ?
|
|
WHERE id = ?",
|
|
params![now_ms(), error, local_mr_id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn clear_sync_health_error(conn: &Connection, local_mr_id: i64) -> Result<()> {
|
|
conn.execute(
|
|
"UPDATE merge_requests SET
|
|
discussions_sync_last_attempt_at = ?,
|
|
discussions_sync_attempts = 0,
|
|
discussions_sync_last_error = NULL
|
|
WHERE id = ?",
|
|
params![now_ms(), local_mr_id],
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::core::db::{create_connection, run_migrations};
|
|
use std::path::Path;
|
|
|
|
#[test]
|
|
fn result_default_has_zero_counts() {
|
|
let result = IngestMrDiscussionsResult::default();
|
|
assert_eq!(result.discussions_fetched, 0);
|
|
assert_eq!(result.discussions_upserted, 0);
|
|
assert_eq!(result.notes_upserted, 0);
|
|
assert_eq!(result.notes_skipped_bad_timestamp, 0);
|
|
assert_eq!(result.diffnotes_count, 0);
|
|
assert!(!result.pagination_succeeded);
|
|
}
|
|
|
|
#[test]
|
|
fn result_pagination_succeeded_false_by_default() {
|
|
let result = IngestMrDiscussionsResult::default();
|
|
assert!(!result.pagination_succeeded);
|
|
}
|
|
|
|
fn setup_mr() -> Connection {
|
|
let conn = create_connection(Path::new(":memory:")).unwrap();
|
|
run_migrations(&conn).unwrap();
|
|
|
|
conn.execute(
|
|
"INSERT INTO projects (gitlab_project_id, path_with_namespace, web_url) \
|
|
VALUES (1, 'group/repo', 'https://gitlab.com/group/repo')",
|
|
[],
|
|
)
|
|
.unwrap();
|
|
|
|
conn.execute(
|
|
"INSERT INTO merge_requests (gitlab_id, iid, project_id, title, state, \
|
|
author_username, source_branch, target_branch, created_at, updated_at, last_seen_at) \
|
|
VALUES (200, 1, 1, 'Test MR', 'opened', 'testuser', 'feat', 'main', 1000, 2000, 3000)",
|
|
[],
|
|
)
|
|
.unwrap();
|
|
|
|
conn.execute(
|
|
"INSERT INTO discussions (gitlab_discussion_id, project_id, merge_request_id, noteable_type, \
|
|
individual_note, last_seen_at, resolvable, resolved) \
|
|
VALUES ('mr-disc-1', 1, 1, 'MergeRequest', 0, 3000, 0, 0)",
|
|
[],
|
|
)
|
|
.unwrap();
|
|
|
|
conn
|
|
}
|
|
|
|
fn get_mr_discussion_id(conn: &Connection) -> i64 {
|
|
conn.query_row("SELECT id FROM discussions LIMIT 1", [], |row| row.get(0))
|
|
.unwrap()
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
fn make_mr_note(
|
|
gitlab_id: i64,
|
|
project_id: i64,
|
|
body: &str,
|
|
note_type: Option<&str>,
|
|
created_at: i64,
|
|
updated_at: i64,
|
|
resolved: bool,
|
|
resolved_by: Option<&str>,
|
|
) -> NormalizedNote {
|
|
NormalizedNote {
|
|
gitlab_id,
|
|
project_id,
|
|
note_type: note_type.map(String::from),
|
|
is_system: false,
|
|
author_id: None,
|
|
author_username: "testuser".to_string(),
|
|
body: body.to_string(),
|
|
created_at,
|
|
updated_at,
|
|
last_seen_at: updated_at,
|
|
position: 0,
|
|
resolvable: false,
|
|
resolved,
|
|
resolved_by: resolved_by.map(String::from),
|
|
resolved_at: None,
|
|
position_old_path: None,
|
|
position_new_path: None,
|
|
position_old_line: None,
|
|
position_new_line: None,
|
|
position_type: None,
|
|
position_line_range_start: None,
|
|
position_line_range_end: None,
|
|
position_base_sha: None,
|
|
position_start_sha: None,
|
|
position_head_sha: None,
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_mr_note_upsert_captures_author_id() {
|
|
let conn = setup_mr();
|
|
let disc_id = get_mr_discussion_id(&conn);
|
|
|
|
let mut note = make_mr_note(8001, 1, "MR note", None, 1000, 2000, false, None);
|
|
note.author_id = Some(12345);
|
|
|
|
upsert_note(&conn, disc_id, ¬e, 5000, None).unwrap();
|
|
|
|
let stored: Option<i64> = conn
|
|
.query_row(
|
|
"SELECT author_id FROM notes WHERE gitlab_id = ?",
|
|
[8001_i64],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap();
|
|
assert_eq!(stored, Some(12345));
|
|
}
|
|
|
|
fn insert_note_document(conn: &Connection, note_local_id: i64) {
|
|
conn.execute(
|
|
"INSERT INTO documents (source_type, source_id, project_id, content_text, content_hash) \
|
|
VALUES ('note', ?1, 1, 'note content', 'hash123')",
|
|
[note_local_id],
|
|
)
|
|
.unwrap();
|
|
}
|
|
|
|
fn count_note_documents(conn: &Connection, note_local_id: i64) -> i64 {
|
|
conn.query_row(
|
|
"SELECT COUNT(*) FROM documents WHERE source_type = 'note' AND source_id = ?",
|
|
[note_local_id],
|
|
|row| row.get(0),
|
|
)
|
|
.unwrap()
|
|
}
|
|
|
|
#[test]
|
|
fn test_mr_note_sweep_deletes_note_documents_immediately() {
|
|
let conn = setup_mr();
|
|
let disc_id = get_mr_discussion_id(&conn);
|
|
let local_project_id = 1;
|
|
let local_mr_id = 1;
|
|
|
|
// Insert 3 notes
|
|
let note1 = make_mr_note(8101, 1, "Keep", None, 1000, 2000, false, None);
|
|
let note2 = make_mr_note(8102, 1, "Keep too", None, 1000, 2000, false, None);
|
|
let note3 = make_mr_note(8103, 1, "Stale", None, 1000, 2000, false, None);
|
|
|
|
let out1 = upsert_note(&conn, disc_id, ¬e1, 5000, None).unwrap();
|
|
let out2 = upsert_note(&conn, disc_id, ¬e2, 5000, None).unwrap();
|
|
let out3 = upsert_note(&conn, disc_id, ¬e3, 5000, None).unwrap();
|
|
|
|
// Add documents for all 3
|
|
insert_note_document(&conn, out1.local_note_id);
|
|
insert_note_document(&conn, out2.local_note_id);
|
|
insert_note_document(&conn, out3.local_note_id);
|
|
|
|
// Re-sync only notes 1 and 2
|
|
upsert_note(&conn, disc_id, ¬e1, 6000, None).unwrap();
|
|
upsert_note(&conn, disc_id, ¬e2, 6000, None).unwrap();
|
|
|
|
// Sweep stale notes
|
|
sweep_stale_notes(&conn, local_project_id, local_mr_id, 6000).unwrap();
|
|
|
|
// Stale note's document should be gone
|
|
assert_eq!(count_note_documents(&conn, out3.local_note_id), 0);
|
|
|
|
// Kept notes' documents should survive
|
|
assert_eq!(count_note_documents(&conn, out1.local_note_id), 1);
|
|
assert_eq!(count_note_documents(&conn, out2.local_note_id), 1);
|
|
}
|
|
}
|