461 lines
13 KiB
Rust
461 lines
13 KiB
Rust
use std::ops::Deref;
|
|
|
|
use futures::StreamExt;
|
|
use rusqlite::{Connection, Transaction};
|
|
use tracing::{debug, warn};
|
|
|
|
use crate::Config;
|
|
use crate::core::error::{LoreError, Result};
|
|
use crate::core::shutdown::ShutdownSignal;
|
|
use crate::core::time::now_ms;
|
|
use crate::documents::SourceType;
|
|
use crate::gitlab::GitLabClient;
|
|
use crate::gitlab::transformers::{MilestoneRow, transform_issue};
|
|
use crate::gitlab::types::GitLabIssue;
|
|
use crate::ingestion::dirty_tracker;
|
|
use crate::ingestion::storage::payloads::{StorePayloadOptions, store_payload};
|
|
|
|
#[derive(Debug, Default)]
|
|
pub struct IngestIssuesResult {
|
|
pub fetched: usize,
|
|
pub upserted: usize,
|
|
pub labels_created: usize,
|
|
pub issues_needing_discussion_sync: Vec<IssueForDiscussionSync>,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct IssueForDiscussionSync {
|
|
pub local_issue_id: i64,
|
|
pub iid: i64,
|
|
pub updated_at: i64,
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
struct SyncCursor {
|
|
updated_at_cursor: Option<i64>,
|
|
tie_breaker_id: Option<i64>,
|
|
}
|
|
|
|
// Keep comfortably below SQLite's default 999 bind-parameter limit.
|
|
const BATCH_LINK_ROWS_MAX: usize = 400;
|
|
|
|
pub async fn ingest_issues(
|
|
conn: &Connection,
|
|
client: &GitLabClient,
|
|
config: &Config,
|
|
project_id: i64,
|
|
gitlab_project_id: i64,
|
|
signal: &ShutdownSignal,
|
|
) -> Result<IngestIssuesResult> {
|
|
let mut result = IngestIssuesResult::default();
|
|
|
|
let cursor = get_sync_cursor(conn, project_id)?;
|
|
debug!(?cursor, "Starting issue ingestion with cursor");
|
|
|
|
let mut issues_stream = client.paginate_issues(
|
|
gitlab_project_id,
|
|
cursor.updated_at_cursor,
|
|
config.sync.cursor_rewind_seconds,
|
|
);
|
|
|
|
let mut batch_count = 0;
|
|
let mut last_updated_at: Option<i64> = None;
|
|
let mut last_gitlab_id: Option<i64> = None;
|
|
|
|
while let Some(issue_result) = issues_stream.next().await {
|
|
if signal.is_cancelled() {
|
|
debug!("Issue ingestion interrupted by shutdown signal");
|
|
break;
|
|
}
|
|
let issue = issue_result?;
|
|
result.fetched += 1;
|
|
|
|
let issue_updated_at = match parse_timestamp(&issue.updated_at) {
|
|
Ok(ts) => ts,
|
|
Err(e) => {
|
|
warn!(
|
|
gitlab_id = issue.id,
|
|
error = %e,
|
|
"Skipping issue with invalid timestamp"
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
if !passes_cursor_filter_with_ts(issue.id, issue_updated_at, &cursor) {
|
|
debug!(gitlab_id = issue.id, "Skipping already-processed issue");
|
|
continue;
|
|
}
|
|
|
|
let labels_created = process_single_issue(conn, config, project_id, &issue)?;
|
|
result.upserted += 1;
|
|
result.labels_created += labels_created;
|
|
|
|
last_updated_at = Some(issue_updated_at);
|
|
last_gitlab_id = Some(issue.id);
|
|
batch_count += 1;
|
|
|
|
if batch_count % 100 == 0
|
|
&& let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id)
|
|
{
|
|
update_sync_cursor(conn, project_id, ts, id)?;
|
|
debug!(batch_count, "Incremental cursor update");
|
|
}
|
|
}
|
|
|
|
if let (Some(ts), Some(id)) = (last_updated_at, last_gitlab_id) {
|
|
update_sync_cursor(conn, project_id, ts, id)?;
|
|
} else if result.fetched == 0 && cursor.updated_at_cursor.is_some() {
|
|
debug!("No new issues found, cursor unchanged");
|
|
}
|
|
|
|
result.issues_needing_discussion_sync = get_issues_needing_discussion_sync(conn, project_id)?;
|
|
|
|
debug!(
|
|
summary = crate::ingestion::nonzero_summary(&[
|
|
("fetched", result.fetched),
|
|
("upserted", result.upserted),
|
|
("labels", result.labels_created),
|
|
("needing sync", result.issues_needing_discussion_sync.len()),
|
|
]),
|
|
"Issue ingestion"
|
|
);
|
|
|
|
Ok(result)
|
|
}
|
|
|
|
fn passes_cursor_filter_with_ts(gitlab_id: i64, issue_ts: i64, cursor: &SyncCursor) -> bool {
|
|
let Some(cursor_ts) = cursor.updated_at_cursor else {
|
|
return true;
|
|
};
|
|
|
|
if issue_ts < cursor_ts {
|
|
return false;
|
|
}
|
|
|
|
if issue_ts == cursor_ts
|
|
&& let Some(cursor_id) = cursor.tie_breaker_id
|
|
&& gitlab_id <= cursor_id
|
|
{
|
|
return false;
|
|
}
|
|
|
|
true
|
|
}
|
|
|
|
pub(crate) fn process_single_issue(
|
|
conn: &Connection,
|
|
config: &Config,
|
|
project_id: i64,
|
|
issue: &GitLabIssue,
|
|
) -> Result<usize> {
|
|
let now = now_ms();
|
|
|
|
let payload_bytes = serde_json::to_vec(issue)?;
|
|
let transformed = transform_issue(issue)?;
|
|
let issue_row = &transformed.issue;
|
|
|
|
let tx = conn.unchecked_transaction()?;
|
|
let labels_created = process_issue_in_transaction(
|
|
&tx,
|
|
config,
|
|
project_id,
|
|
issue,
|
|
&payload_bytes,
|
|
issue_row,
|
|
&transformed.label_names,
|
|
&transformed.assignee_usernames,
|
|
transformed.milestone.as_ref(),
|
|
now,
|
|
)?;
|
|
tx.commit()?;
|
|
|
|
Ok(labels_created)
|
|
}
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
fn process_issue_in_transaction(
|
|
tx: &Transaction<'_>,
|
|
config: &Config,
|
|
project_id: i64,
|
|
issue: &GitLabIssue,
|
|
payload_bytes: &[u8],
|
|
issue_row: &crate::gitlab::transformers::IssueRow,
|
|
label_names: &[String],
|
|
assignee_usernames: &[String],
|
|
milestone: Option<&MilestoneRow>,
|
|
now: i64,
|
|
) -> Result<usize> {
|
|
let mut labels_created = 0;
|
|
|
|
let payload_id = store_payload(
|
|
tx.deref(),
|
|
StorePayloadOptions {
|
|
project_id: Some(project_id),
|
|
resource_type: "issue",
|
|
gitlab_id: &issue.id.to_string(),
|
|
json_bytes: payload_bytes,
|
|
compress: config.storage.compress_raw_payloads,
|
|
},
|
|
)?;
|
|
|
|
let milestone_id: Option<i64> = if let Some(m) = milestone {
|
|
Some(upsert_milestone_tx(tx, project_id, m)?)
|
|
} else {
|
|
None
|
|
};
|
|
|
|
tx.execute(
|
|
"INSERT INTO issues (
|
|
gitlab_id, project_id, iid, title, description, state,
|
|
author_username, created_at, updated_at, last_seen_at, web_url,
|
|
due_date, milestone_id, milestone_title, raw_payload_id
|
|
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15)
|
|
ON CONFLICT(gitlab_id) DO UPDATE SET
|
|
title = excluded.title,
|
|
description = excluded.description,
|
|
state = excluded.state,
|
|
author_username = excluded.author_username,
|
|
updated_at = excluded.updated_at,
|
|
last_seen_at = excluded.last_seen_at,
|
|
web_url = excluded.web_url,
|
|
due_date = excluded.due_date,
|
|
milestone_id = excluded.milestone_id,
|
|
milestone_title = excluded.milestone_title,
|
|
raw_payload_id = excluded.raw_payload_id",
|
|
(
|
|
issue_row.gitlab_id,
|
|
project_id,
|
|
issue_row.iid,
|
|
&issue_row.title,
|
|
&issue_row.description,
|
|
&issue_row.state,
|
|
&issue_row.author_username,
|
|
issue_row.created_at,
|
|
issue_row.updated_at,
|
|
now,
|
|
&issue_row.web_url,
|
|
&issue_row.due_date,
|
|
milestone_id,
|
|
&issue_row.milestone_title,
|
|
payload_id,
|
|
),
|
|
)?;
|
|
|
|
let local_issue_id: i64 = tx.query_row(
|
|
"SELECT id FROM issues WHERE project_id = ? AND iid = ?",
|
|
(project_id, issue_row.iid),
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
dirty_tracker::mark_dirty_tx(tx, SourceType::Issue, local_issue_id)?;
|
|
|
|
tx.execute(
|
|
"DELETE FROM issue_labels WHERE issue_id = ?",
|
|
[local_issue_id],
|
|
)?;
|
|
|
|
let mut label_ids = Vec::with_capacity(label_names.len());
|
|
for label_name in label_names {
|
|
let label_id = upsert_label_tx(tx, project_id, label_name, &mut labels_created)?;
|
|
label_ids.push(label_id);
|
|
}
|
|
link_issue_labels_batch_tx(tx, local_issue_id, &label_ids)?;
|
|
|
|
tx.execute(
|
|
"DELETE FROM issue_assignees WHERE issue_id = ?",
|
|
[local_issue_id],
|
|
)?;
|
|
|
|
insert_issue_assignees_batch_tx(tx, local_issue_id, assignee_usernames)?;
|
|
|
|
Ok(labels_created)
|
|
}
|
|
|
|
fn upsert_label_tx(
|
|
tx: &Transaction<'_>,
|
|
project_id: i64,
|
|
name: &str,
|
|
created_count: &mut usize,
|
|
) -> Result<i64> {
|
|
tx.execute(
|
|
"INSERT OR IGNORE INTO labels (project_id, name) VALUES (?1, ?2)",
|
|
(project_id, name),
|
|
)?;
|
|
|
|
if tx.changes() > 0 {
|
|
*created_count += 1;
|
|
}
|
|
|
|
let id: i64 = tx.query_row(
|
|
"SELECT id FROM labels WHERE project_id = ?1 AND name = ?2",
|
|
(project_id, name),
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
Ok(id)
|
|
}
|
|
|
|
fn link_issue_labels_batch_tx(
|
|
tx: &Transaction<'_>,
|
|
issue_id: i64,
|
|
label_ids: &[i64],
|
|
) -> Result<()> {
|
|
if label_ids.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
for chunk in label_ids.chunks(BATCH_LINK_ROWS_MAX) {
|
|
let placeholders = (0..chunk.len())
|
|
.map(|idx| format!("(?1, ?{})", idx + 2))
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
let sql = format!(
|
|
"INSERT OR IGNORE INTO issue_labels (issue_id, label_id) VALUES {}",
|
|
placeholders
|
|
);
|
|
|
|
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1);
|
|
params.push(&issue_id);
|
|
for label_id in chunk {
|
|
params.push(label_id);
|
|
}
|
|
|
|
tx.execute(&sql, params.as_slice())?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn insert_issue_assignees_batch_tx(
|
|
tx: &Transaction<'_>,
|
|
issue_id: i64,
|
|
usernames: &[String],
|
|
) -> Result<()> {
|
|
if usernames.is_empty() {
|
|
return Ok(());
|
|
}
|
|
|
|
for chunk in usernames.chunks(BATCH_LINK_ROWS_MAX) {
|
|
let placeholders = (0..chunk.len())
|
|
.map(|idx| format!("(?1, ?{})", idx + 2))
|
|
.collect::<Vec<_>>()
|
|
.join(", ");
|
|
let sql = format!(
|
|
"INSERT OR IGNORE INTO issue_assignees (issue_id, username) VALUES {}",
|
|
placeholders
|
|
);
|
|
|
|
let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 1);
|
|
params.push(&issue_id);
|
|
for username in chunk {
|
|
params.push(username);
|
|
}
|
|
|
|
tx.execute(&sql, params.as_slice())?;
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn upsert_milestone_tx(
|
|
tx: &Transaction<'_>,
|
|
project_id: i64,
|
|
milestone: &MilestoneRow,
|
|
) -> Result<i64> {
|
|
let local_id: i64 = tx.query_row(
|
|
"INSERT INTO milestones (gitlab_id, project_id, iid, title, description, state, due_date, web_url)
|
|
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)
|
|
ON CONFLICT(project_id, gitlab_id) DO UPDATE SET
|
|
iid = excluded.iid,
|
|
title = excluded.title,
|
|
description = excluded.description,
|
|
state = excluded.state,
|
|
due_date = excluded.due_date,
|
|
web_url = excluded.web_url
|
|
RETURNING id",
|
|
(
|
|
milestone.gitlab_id,
|
|
project_id,
|
|
milestone.iid,
|
|
&milestone.title,
|
|
&milestone.description,
|
|
&milestone.state,
|
|
&milestone.due_date,
|
|
&milestone.web_url,
|
|
),
|
|
|row| row.get(0),
|
|
)?;
|
|
|
|
Ok(local_id)
|
|
}
|
|
|
|
fn get_sync_cursor(conn: &Connection, project_id: i64) -> Result<SyncCursor> {
|
|
let row: Option<(Option<i64>, Option<i64>)> = conn
|
|
.query_row(
|
|
"SELECT updated_at_cursor, tie_breaker_id FROM sync_cursors
|
|
WHERE project_id = ? AND resource_type = 'issues'",
|
|
[project_id],
|
|
|row| Ok((row.get(0)?, row.get(1)?)),
|
|
)
|
|
.ok();
|
|
|
|
Ok(match row {
|
|
Some((updated_at, tie_breaker)) => SyncCursor {
|
|
updated_at_cursor: updated_at,
|
|
tie_breaker_id: tie_breaker,
|
|
},
|
|
None => SyncCursor::default(),
|
|
})
|
|
}
|
|
|
|
fn update_sync_cursor(
|
|
conn: &Connection,
|
|
project_id: i64,
|
|
updated_at: i64,
|
|
gitlab_id: i64,
|
|
) -> Result<()> {
|
|
conn.execute(
|
|
"INSERT INTO sync_cursors (project_id, resource_type, updated_at_cursor, tie_breaker_id)
|
|
VALUES (?1, 'issues', ?2, ?3)
|
|
ON CONFLICT(project_id, resource_type) DO UPDATE SET
|
|
updated_at_cursor = excluded.updated_at_cursor,
|
|
tie_breaker_id = excluded.tie_breaker_id",
|
|
(project_id, updated_at, gitlab_id),
|
|
)?;
|
|
Ok(())
|
|
}
|
|
|
|
fn get_issues_needing_discussion_sync(
|
|
conn: &Connection,
|
|
project_id: i64,
|
|
) -> Result<Vec<IssueForDiscussionSync>> {
|
|
let mut stmt = conn.prepare(
|
|
"SELECT id, iid, updated_at FROM issues
|
|
WHERE project_id = ?
|
|
AND updated_at > COALESCE(discussions_synced_for_updated_at, 0)",
|
|
)?;
|
|
|
|
let issues: std::result::Result<Vec<_>, _> = stmt
|
|
.query_map([project_id], |row| {
|
|
Ok(IssueForDiscussionSync {
|
|
local_issue_id: row.get(0)?,
|
|
iid: row.get(1)?,
|
|
updated_at: row.get(2)?,
|
|
})
|
|
})?
|
|
.collect();
|
|
|
|
Ok(issues?)
|
|
}
|
|
|
|
fn parse_timestamp(ts: &str) -> Result<i64> {
|
|
chrono::DateTime::parse_from_rfc3339(ts)
|
|
.map(|dt| dt.timestamp_millis())
|
|
.map_err(|e| LoreError::Other(format!("Failed to parse timestamp '{}': {}", ts, e)))
|
|
}
|
|
|
|
#[cfg(test)]
|
|
#[path = "issues_tests.rs"]
|
|
mod tests;
|