Compare commits
3 Commits
bb75a9d228
...
deafa88af5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
deafa88af5 | ||
|
|
880ad1d3fa | ||
|
|
4c0123426a |
@@ -56,8 +56,8 @@ pub fn enqueue_job(
|
|||||||
|
|
||||||
/// Claim a batch of jobs for processing.
|
/// Claim a batch of jobs for processing.
|
||||||
///
|
///
|
||||||
/// Atomically sets `locked_at` on the claimed jobs. Only claims jobs where
|
/// Atomically selects and locks jobs within a transaction. Only claims jobs
|
||||||
/// `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
|
/// where `locked_at IS NULL` and `(next_retry_at IS NULL OR next_retry_at <= now)`.
|
||||||
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> {
|
pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Result<Vec<PendingJob>> {
|
||||||
if batch_size == 0 {
|
if batch_size == 0 {
|
||||||
return Ok(Vec::new());
|
return Ok(Vec::new());
|
||||||
@@ -65,19 +65,25 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
|
|||||||
|
|
||||||
let now = now_ms();
|
let now = now_ms();
|
||||||
|
|
||||||
// Find available jobs
|
// Use UPDATE ... RETURNING to atomically select and lock in one statement.
|
||||||
let mut select_stmt = conn.prepare_cached(
|
// This eliminates the race between SELECT and UPDATE.
|
||||||
"SELECT id, project_id, entity_type, entity_iid, entity_local_id, job_type, payload_json, attempts
|
let mut stmt = conn.prepare_cached(
|
||||||
FROM pending_dependent_fetches
|
"UPDATE pending_dependent_fetches
|
||||||
WHERE job_type = ?1
|
SET locked_at = ?1
|
||||||
AND locked_at IS NULL
|
WHERE id IN (
|
||||||
AND (next_retry_at IS NULL OR next_retry_at <= ?2)
|
SELECT id FROM pending_dependent_fetches
|
||||||
ORDER BY enqueued_at ASC
|
WHERE job_type = ?2
|
||||||
LIMIT ?3",
|
AND locked_at IS NULL
|
||||||
|
AND (next_retry_at IS NULL OR next_retry_at <= ?1)
|
||||||
|
ORDER BY enqueued_at ASC
|
||||||
|
LIMIT ?3
|
||||||
|
)
|
||||||
|
RETURNING id, project_id, entity_type, entity_iid, entity_local_id,
|
||||||
|
job_type, payload_json, attempts",
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let jobs: Vec<PendingJob> = select_stmt
|
let jobs = stmt
|
||||||
.query_map(rusqlite::params![job_type, now, batch_size as i64], |row| {
|
.query_map(rusqlite::params![now, job_type, batch_size as i64], |row| {
|
||||||
Ok(PendingJob {
|
Ok(PendingJob {
|
||||||
id: row.get(0)?,
|
id: row.get(0)?,
|
||||||
project_id: row.get(1)?,
|
project_id: row.get(1)?,
|
||||||
@@ -91,18 +97,6 @@ pub fn claim_jobs(conn: &Connection, job_type: &str, batch_size: usize) -> Resul
|
|||||||
})?
|
})?
|
||||||
.collect::<std::result::Result<Vec<_>, _>>()?;
|
.collect::<std::result::Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
// Lock the claimed jobs
|
|
||||||
if !jobs.is_empty() {
|
|
||||||
let ids: Vec<String> = jobs.iter().map(|j| j.id.to_string()).collect();
|
|
||||||
let placeholders = ids.join(",");
|
|
||||||
conn.execute(
|
|
||||||
&format!(
|
|
||||||
"UPDATE pending_dependent_fetches SET locked_at = ?1 WHERE id IN ({placeholders})"
|
|
||||||
),
|
|
||||||
rusqlite::params![now],
|
|
||||||
)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(jobs)
|
Ok(jobs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ use crate::gitlab::types::{GitLabLabelEvent, GitLabMilestoneEvent, GitLabStateEv
|
|||||||
/// Upsert state events for an entity.
|
/// Upsert state events for an entity.
|
||||||
///
|
///
|
||||||
/// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id).
|
/// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id).
|
||||||
/// Wraps in a savepoint for atomicity per entity.
|
/// Caller is responsible for wrapping in a transaction if atomicity is needed.
|
||||||
pub fn upsert_state_events(
|
pub fn upsert_state_events(
|
||||||
conn: &mut Connection,
|
conn: &Connection,
|
||||||
project_id: i64,
|
project_id: i64,
|
||||||
entity_type: &str,
|
entity_type: &str,
|
||||||
entity_local_id: i64,
|
entity_local_id: i64,
|
||||||
@@ -19,9 +19,7 @@ pub fn upsert_state_events(
|
|||||||
) -> Result<usize> {
|
) -> Result<usize> {
|
||||||
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
||||||
|
|
||||||
let sp = conn.savepoint()?;
|
let mut stmt = conn.prepare_cached(
|
||||||
|
|
||||||
let mut stmt = sp.prepare_cached(
|
|
||||||
"INSERT OR REPLACE INTO resource_state_events
|
"INSERT OR REPLACE INTO resource_state_events
|
||||||
(gitlab_id, project_id, issue_id, merge_request_id, state,
|
(gitlab_id, project_id, issue_id, merge_request_id, state,
|
||||||
actor_gitlab_id, actor_username, created_at,
|
actor_gitlab_id, actor_username, created_at,
|
||||||
@@ -51,15 +49,13 @@ pub fn upsert_state_events(
|
|||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(stmt);
|
|
||||||
sp.commit()?;
|
|
||||||
|
|
||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Upsert label events for an entity.
|
/// Upsert label events for an entity.
|
||||||
|
/// Caller is responsible for wrapping in a transaction if atomicity is needed.
|
||||||
pub fn upsert_label_events(
|
pub fn upsert_label_events(
|
||||||
conn: &mut Connection,
|
conn: &Connection,
|
||||||
project_id: i64,
|
project_id: i64,
|
||||||
entity_type: &str,
|
entity_type: &str,
|
||||||
entity_local_id: i64,
|
entity_local_id: i64,
|
||||||
@@ -67,9 +63,7 @@ pub fn upsert_label_events(
|
|||||||
) -> Result<usize> {
|
) -> Result<usize> {
|
||||||
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
||||||
|
|
||||||
let sp = conn.savepoint()?;
|
let mut stmt = conn.prepare_cached(
|
||||||
|
|
||||||
let mut stmt = sp.prepare_cached(
|
|
||||||
"INSERT OR REPLACE INTO resource_label_events
|
"INSERT OR REPLACE INTO resource_label_events
|
||||||
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
||||||
label_name, actor_gitlab_id, actor_username, created_at)
|
label_name, actor_gitlab_id, actor_username, created_at)
|
||||||
@@ -96,15 +90,13 @@ pub fn upsert_label_events(
|
|||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(stmt);
|
|
||||||
sp.commit()?;
|
|
||||||
|
|
||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Upsert milestone events for an entity.
|
/// Upsert milestone events for an entity.
|
||||||
|
/// Caller is responsible for wrapping in a transaction if atomicity is needed.
|
||||||
pub fn upsert_milestone_events(
|
pub fn upsert_milestone_events(
|
||||||
conn: &mut Connection,
|
conn: &Connection,
|
||||||
project_id: i64,
|
project_id: i64,
|
||||||
entity_type: &str,
|
entity_type: &str,
|
||||||
entity_local_id: i64,
|
entity_local_id: i64,
|
||||||
@@ -112,9 +104,7 @@ pub fn upsert_milestone_events(
|
|||||||
) -> Result<usize> {
|
) -> Result<usize> {
|
||||||
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
||||||
|
|
||||||
let sp = conn.savepoint()?;
|
let mut stmt = conn.prepare_cached(
|
||||||
|
|
||||||
let mut stmt = sp.prepare_cached(
|
|
||||||
"INSERT OR REPLACE INTO resource_milestone_events
|
"INSERT OR REPLACE INTO resource_milestone_events
|
||||||
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
||||||
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
|
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
|
||||||
@@ -142,9 +132,6 @@ pub fn upsert_milestone_events(
|
|||||||
count += 1;
|
count += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(stmt);
|
|
||||||
sp.commit()?;
|
|
||||||
|
|
||||||
Ok(count)
|
Ok(count)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -166,12 +166,12 @@ pub fn extract_issue_document(conn: &Connection, issue_id: i64) -> Result<Option
|
|||||||
content.push_str(desc);
|
content.push_str(desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
let content_hash = compute_content_hash(&content);
|
|
||||||
let labels_hash = compute_list_hash(&labels);
|
let labels_hash = compute_list_hash(&labels);
|
||||||
let paths_hash = compute_list_hash(&[]); // Issues have no paths
|
let paths_hash = compute_list_hash(&[]); // Issues have no paths
|
||||||
|
|
||||||
// Apply hard cap truncation for safety
|
// Apply hard cap truncation for safety, then hash the final stored content
|
||||||
let hard_cap = truncate_hard_cap(&content);
|
let hard_cap = truncate_hard_cap(&content);
|
||||||
|
let content_hash = compute_content_hash(&hard_cap.content);
|
||||||
|
|
||||||
Ok(Some(DocumentData {
|
Ok(Some(DocumentData {
|
||||||
source_type: SourceType::Issue,
|
source_type: SourceType::Issue,
|
||||||
@@ -281,12 +281,12 @@ pub fn extract_mr_document(conn: &Connection, mr_id: i64) -> Result<Option<Docum
|
|||||||
content.push_str(desc);
|
content.push_str(desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
let content_hash = compute_content_hash(&content);
|
|
||||||
let labels_hash = compute_list_hash(&labels);
|
let labels_hash = compute_list_hash(&labels);
|
||||||
let paths_hash = compute_list_hash(&[]);
|
let paths_hash = compute_list_hash(&[]);
|
||||||
|
|
||||||
// Apply hard cap truncation for safety
|
// Apply hard cap truncation for safety, then hash the final stored content
|
||||||
let hard_cap = truncate_hard_cap(&content);
|
let hard_cap = truncate_hard_cap(&content);
|
||||||
|
let content_hash = compute_content_hash(&hard_cap.content);
|
||||||
|
|
||||||
Ok(Some(DocumentData {
|
Ok(Some(DocumentData {
|
||||||
source_type: SourceType::MergeRequest,
|
source_type: SourceType::MergeRequest,
|
||||||
|
|||||||
@@ -662,7 +662,7 @@ impl GitLabClient {
|
|||||||
self.fetch_all_pages(&path).await
|
self.fetch_all_pages(&path).await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch all three event types for an entity in one call.
|
/// Fetch all three event types for an entity concurrently.
|
||||||
pub async fn fetch_all_resource_events(
|
pub async fn fetch_all_resource_events(
|
||||||
&self,
|
&self,
|
||||||
gitlab_project_id: i64,
|
gitlab_project_id: i64,
|
||||||
@@ -675,23 +675,19 @@ impl GitLabClient {
|
|||||||
)> {
|
)> {
|
||||||
match entity_type {
|
match entity_type {
|
||||||
"issue" => {
|
"issue" => {
|
||||||
let state = self
|
let (state, label, milestone) = tokio::try_join!(
|
||||||
.fetch_issue_state_events(gitlab_project_id, iid)
|
self.fetch_issue_state_events(gitlab_project_id, iid),
|
||||||
.await?;
|
self.fetch_issue_label_events(gitlab_project_id, iid),
|
||||||
let label = self
|
self.fetch_issue_milestone_events(gitlab_project_id, iid),
|
||||||
.fetch_issue_label_events(gitlab_project_id, iid)
|
)?;
|
||||||
.await?;
|
|
||||||
let milestone = self
|
|
||||||
.fetch_issue_milestone_events(gitlab_project_id, iid)
|
|
||||||
.await?;
|
|
||||||
Ok((state, label, milestone))
|
Ok((state, label, milestone))
|
||||||
}
|
}
|
||||||
"merge_request" => {
|
"merge_request" => {
|
||||||
let state = self.fetch_mr_state_events(gitlab_project_id, iid).await?;
|
let (state, label, milestone) = tokio::try_join!(
|
||||||
let label = self.fetch_mr_label_events(gitlab_project_id, iid).await?;
|
self.fetch_mr_state_events(gitlab_project_id, iid),
|
||||||
let milestone = self
|
self.fetch_mr_label_events(gitlab_project_id, iid),
|
||||||
.fetch_mr_milestone_events(gitlab_project_id, iid)
|
self.fetch_mr_milestone_events(gitlab_project_id, iid),
|
||||||
.await?;
|
)?;
|
||||||
Ok((state, label, milestone))
|
Ok((state, label, milestone))
|
||||||
}
|
}
|
||||||
_ => Err(LoreError::Other(format!(
|
_ => Err(LoreError::Other(format!(
|
||||||
|
|||||||
@@ -540,36 +540,31 @@ async fn drain_resource_events(
|
|||||||
});
|
});
|
||||||
|
|
||||||
let mut processed = 0;
|
let mut processed = 0;
|
||||||
|
let mut seen_job_ids = std::collections::HashSet::new();
|
||||||
// Max iterations guard: prevent infinite loop if jobs keep failing and retrying
|
|
||||||
// within the same drain run. Allow 2x total_pending iterations as safety margin.
|
|
||||||
let max_iterations = total_pending * 2;
|
|
||||||
let mut iterations = 0;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
if iterations >= max_iterations {
|
|
||||||
warn!(
|
|
||||||
iterations,
|
|
||||||
total_pending, "Resource events drain hit max iterations guard, stopping"
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
let jobs = claim_jobs(conn, "resource_events", batch_size)?;
|
let jobs = claim_jobs(conn, "resource_events", batch_size)?;
|
||||||
if jobs.is_empty() {
|
if jobs.is_empty() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
for job in &jobs {
|
for job in &jobs {
|
||||||
iterations += 1;
|
// Guard against re-processing a job that was failed and re-claimed
|
||||||
|
// within the same drain run (shouldn't happen due to backoff, but
|
||||||
|
// defensive against clock skew or zero-backoff edge cases).
|
||||||
|
if !seen_job_ids.insert(job.id) {
|
||||||
|
warn!(
|
||||||
|
job_id = job.id,
|
||||||
|
"Skipping already-processed job in same drain run"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
match client
|
match client
|
||||||
.fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid)
|
.fetch_all_resource_events(gitlab_project_id, &job.entity_type, job.entity_iid)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok((state_events, label_events, milestone_events)) => {
|
Ok((state_events, label_events, milestone_events)) => {
|
||||||
// Store events - we need &mut Connection for savepoints in upsert functions.
|
|
||||||
// Use unchecked_transaction as a workaround since we have &Connection.
|
|
||||||
let store_result = store_resource_events(
|
let store_result = store_resource_events(
|
||||||
conn,
|
conn,
|
||||||
job.project_id,
|
job.project_id,
|
||||||
@@ -635,8 +630,7 @@ async fn drain_resource_events(
|
|||||||
|
|
||||||
/// Store fetched resource events in the database.
|
/// Store fetched resource events in the database.
|
||||||
///
|
///
|
||||||
/// Uses unchecked_transaction to work with &Connection (not &mut Connection),
|
/// Wraps all three event types in a single transaction for atomicity.
|
||||||
/// which is safe because we're single-threaded and using WAL mode.
|
|
||||||
fn store_resource_events(
|
fn store_resource_events(
|
||||||
conn: &Connection,
|
conn: &Connection,
|
||||||
project_id: i64,
|
project_id: i64,
|
||||||
@@ -646,23 +640,30 @@ fn store_resource_events(
|
|||||||
label_events: &[crate::gitlab::types::GitLabLabelEvent],
|
label_events: &[crate::gitlab::types::GitLabLabelEvent],
|
||||||
milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent],
|
milestone_events: &[crate::gitlab::types::GitLabMilestoneEvent],
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
// The upsert functions require &mut Connection for savepoints.
|
|
||||||
// We use unchecked_transaction to wrap all three upserts atomically,
|
|
||||||
// then call the upsert functions using the transaction's inner connection.
|
|
||||||
let tx = conn.unchecked_transaction()?;
|
let tx = conn.unchecked_transaction()?;
|
||||||
|
|
||||||
// State events - use raw SQL within transaction instead of upsert_state_events
|
|
||||||
// which requires &mut Connection
|
|
||||||
if !state_events.is_empty() {
|
if !state_events.is_empty() {
|
||||||
store_state_events_tx(&tx, project_id, entity_type, entity_local_id, state_events)?;
|
crate::core::events_db::upsert_state_events(
|
||||||
|
&tx,
|
||||||
|
project_id,
|
||||||
|
entity_type,
|
||||||
|
entity_local_id,
|
||||||
|
state_events,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if !label_events.is_empty() {
|
if !label_events.is_empty() {
|
||||||
store_label_events_tx(&tx, project_id, entity_type, entity_local_id, label_events)?;
|
crate::core::events_db::upsert_label_events(
|
||||||
|
&tx,
|
||||||
|
project_id,
|
||||||
|
entity_type,
|
||||||
|
entity_local_id,
|
||||||
|
label_events,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if !milestone_events.is_empty() {
|
if !milestone_events.is_empty() {
|
||||||
store_milestone_events_tx(
|
crate::core::events_db::upsert_milestone_events(
|
||||||
&tx,
|
&tx,
|
||||||
project_id,
|
project_id,
|
||||||
entity_type,
|
entity_type,
|
||||||
@@ -675,139 +676,6 @@ fn store_resource_events(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Store state events within an existing transaction.
|
|
||||||
fn store_state_events_tx(
|
|
||||||
tx: &rusqlite::Transaction<'_>,
|
|
||||||
project_id: i64,
|
|
||||||
entity_type: &str,
|
|
||||||
entity_local_id: i64,
|
|
||||||
events: &[crate::gitlab::types::GitLabStateEvent],
|
|
||||||
) -> Result<()> {
|
|
||||||
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
|
|
||||||
"issue" => (Some(entity_local_id), None),
|
|
||||||
"merge_request" => (None, Some(entity_local_id)),
|
|
||||||
_ => return Ok(()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut stmt = tx.prepare_cached(
|
|
||||||
"INSERT OR REPLACE INTO resource_state_events
|
|
||||||
(gitlab_id, project_id, issue_id, merge_request_id, state,
|
|
||||||
actor_gitlab_id, actor_username, created_at,
|
|
||||||
source_commit, source_merge_request_iid)
|
|
||||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
|
|
||||||
)?;
|
|
||||||
|
|
||||||
for event in events {
|
|
||||||
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
|
|
||||||
.map_err(crate::core::error::LoreError::Other)?;
|
|
||||||
let actor_id = event.user.as_ref().map(|u| u.id);
|
|
||||||
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
|
|
||||||
let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid);
|
|
||||||
|
|
||||||
stmt.execute(rusqlite::params![
|
|
||||||
event.id,
|
|
||||||
project_id,
|
|
||||||
issue_id,
|
|
||||||
merge_request_id,
|
|
||||||
event.state,
|
|
||||||
actor_id,
|
|
||||||
actor_username,
|
|
||||||
created_at,
|
|
||||||
event.source_commit,
|
|
||||||
source_mr_iid,
|
|
||||||
])?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Store label events within an existing transaction.
|
|
||||||
fn store_label_events_tx(
|
|
||||||
tx: &rusqlite::Transaction<'_>,
|
|
||||||
project_id: i64,
|
|
||||||
entity_type: &str,
|
|
||||||
entity_local_id: i64,
|
|
||||||
events: &[crate::gitlab::types::GitLabLabelEvent],
|
|
||||||
) -> Result<()> {
|
|
||||||
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
|
|
||||||
"issue" => (Some(entity_local_id), None),
|
|
||||||
"merge_request" => (None, Some(entity_local_id)),
|
|
||||||
_ => return Ok(()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut stmt = tx.prepare_cached(
|
|
||||||
"INSERT OR REPLACE INTO resource_label_events
|
|
||||||
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
|
||||||
label_name, actor_gitlab_id, actor_username, created_at)
|
|
||||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
|
|
||||||
)?;
|
|
||||||
|
|
||||||
for event in events {
|
|
||||||
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
|
|
||||||
.map_err(crate::core::error::LoreError::Other)?;
|
|
||||||
let actor_id = event.user.as_ref().map(|u| u.id);
|
|
||||||
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
|
|
||||||
|
|
||||||
stmt.execute(rusqlite::params![
|
|
||||||
event.id,
|
|
||||||
project_id,
|
|
||||||
issue_id,
|
|
||||||
merge_request_id,
|
|
||||||
event.action,
|
|
||||||
event.label.name,
|
|
||||||
actor_id,
|
|
||||||
actor_username,
|
|
||||||
created_at,
|
|
||||||
])?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Store milestone events within an existing transaction.
|
|
||||||
fn store_milestone_events_tx(
|
|
||||||
tx: &rusqlite::Transaction<'_>,
|
|
||||||
project_id: i64,
|
|
||||||
entity_type: &str,
|
|
||||||
entity_local_id: i64,
|
|
||||||
events: &[crate::gitlab::types::GitLabMilestoneEvent],
|
|
||||||
) -> Result<()> {
|
|
||||||
let (issue_id, merge_request_id): (Option<i64>, Option<i64>) = match entity_type {
|
|
||||||
"issue" => (Some(entity_local_id), None),
|
|
||||||
"merge_request" => (None, Some(entity_local_id)),
|
|
||||||
_ => return Ok(()),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut stmt = tx.prepare_cached(
|
|
||||||
"INSERT OR REPLACE INTO resource_milestone_events
|
|
||||||
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
|
||||||
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
|
|
||||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
|
|
||||||
)?;
|
|
||||||
|
|
||||||
for event in events {
|
|
||||||
let created_at = crate::core::time::iso_to_ms_strict(&event.created_at)
|
|
||||||
.map_err(crate::core::error::LoreError::Other)?;
|
|
||||||
let actor_id = event.user.as_ref().map(|u| u.id);
|
|
||||||
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
|
|
||||||
|
|
||||||
stmt.execute(rusqlite::params![
|
|
||||||
event.id,
|
|
||||||
project_id,
|
|
||||||
issue_id,
|
|
||||||
merge_request_id,
|
|
||||||
event.action,
|
|
||||||
event.milestone.title,
|
|
||||||
event.milestone.id,
|
|
||||||
actor_id,
|
|
||||||
actor_username,
|
|
||||||
created_at,
|
|
||||||
])?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|||||||
@@ -72,8 +72,8 @@ async fn main() {
|
|||||||
let quiet = cli.quiet;
|
let quiet = cli.quiet;
|
||||||
|
|
||||||
let result = match cli.command {
|
let result = match cli.command {
|
||||||
Commands::Issues(args) => handle_issues(cli.config.as_deref(), args, robot_mode).await,
|
Commands::Issues(args) => handle_issues(cli.config.as_deref(), args, robot_mode),
|
||||||
Commands::Mrs(args) => handle_mrs(cli.config.as_deref(), args, robot_mode).await,
|
Commands::Mrs(args) => handle_mrs(cli.config.as_deref(), args, robot_mode),
|
||||||
Commands::Search(args) => handle_search(cli.config.as_deref(), args, robot_mode).await,
|
Commands::Search(args) => handle_search(cli.config.as_deref(), args, robot_mode).await,
|
||||||
Commands::Stats(args) => handle_stats(cli.config.as_deref(), args, robot_mode).await,
|
Commands::Stats(args) => handle_stats(cli.config.as_deref(), args, robot_mode).await,
|
||||||
Commands::Embed(args) => handle_embed(cli.config.as_deref(), args, robot_mode).await,
|
Commands::Embed(args) => handle_embed(cli.config.as_deref(), args, robot_mode).await,
|
||||||
@@ -284,7 +284,7 @@ fn handle_error(e: Box<dyn std::error::Error>, robot_mode: bool) -> ! {
|
|||||||
// Primary command handlers
|
// Primary command handlers
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
async fn handle_issues(
|
fn handle_issues(
|
||||||
config_override: Option<&str>,
|
config_override: Option<&str>,
|
||||||
args: IssuesArgs,
|
args: IssuesArgs,
|
||||||
robot_mode: bool,
|
robot_mode: bool,
|
||||||
@@ -334,7 +334,7 @@ async fn handle_issues(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_mrs(
|
fn handle_mrs(
|
||||||
config_override: Option<&str>,
|
config_override: Option<&str>,
|
||||||
args: MrsArgs,
|
args: MrsArgs,
|
||||||
robot_mode: bool,
|
robot_mode: bool,
|
||||||
|
|||||||
Reference in New Issue
Block a user