feat(db): Add event upsert functions and count queries in events_db module
New module src/core/events_db.rs provides database operations for resource events: - upsert_state_events: Batch INSERT OR REPLACE for state change events, keyed on UNIQUE(gitlab_id, project_id). Wraps in a savepoint for atomicity per entity batch. Maps GitLabStateEvent fields including optional user, source_commit, and source_merge_request_iid. - upsert_label_events: Same pattern for label add/remove events, extracting label.name for denormalized storage. - upsert_milestone_events: Same pattern for milestone assignment events, storing both milestone.title and milestone.id. All three upsert functions: - Take &mut Connection (required for savepoint creation) - Use prepare_cached for statement reuse across batch iterations - Convert ISO timestamps via iso_to_ms_strict for ms-epoch storage - Propagate rusqlite errors via the #[from] LoreError::Database path - Return the count of events processed Supporting functions: - resolve_entity_ids: Maps entity_type string to (issue_id, MR_id) pair with exactly-one-non-NULL invariant matching the CHECK constraints - count_events: Queries all three event tables with conditional COUNT aggregations, returning EventCounts struct. Uses unwrap_or((0, 0)) for graceful degradation when tables don't exist (pre-migration 011). Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
232
src/core/events_db.rs
Normal file
232
src/core/events_db.rs
Normal file
@@ -0,0 +1,232 @@
|
||||
//! Database upsert functions for resource events (state, label, milestone).
|
||||
|
||||
use rusqlite::Connection;
|
||||
|
||||
use super::error::{LoreError, Result};
|
||||
use super::time::iso_to_ms_strict;
|
||||
use crate::gitlab::types::{GitLabLabelEvent, GitLabMilestoneEvent, GitLabStateEvent};
|
||||
|
||||
/// Upsert state events for an entity.
|
||||
///
|
||||
/// Uses INSERT OR REPLACE keyed on UNIQUE(gitlab_id, project_id).
|
||||
/// Wraps in a savepoint for atomicity per entity.
|
||||
pub fn upsert_state_events(
|
||||
conn: &mut Connection,
|
||||
project_id: i64,
|
||||
entity_type: &str,
|
||||
entity_local_id: i64,
|
||||
events: &[GitLabStateEvent],
|
||||
) -> Result<usize> {
|
||||
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
||||
|
||||
let sp = conn.savepoint()?;
|
||||
|
||||
let mut stmt = sp.prepare_cached(
|
||||
"INSERT OR REPLACE INTO resource_state_events
|
||||
(gitlab_id, project_id, issue_id, merge_request_id, state,
|
||||
actor_gitlab_id, actor_username, created_at,
|
||||
source_commit, source_merge_request_iid)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
|
||||
)?;
|
||||
|
||||
let mut count = 0;
|
||||
for event in events {
|
||||
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
|
||||
let actor_id = event.user.as_ref().map(|u| u.id);
|
||||
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
|
||||
let source_mr_iid = event.source_merge_request.as_ref().map(|mr| mr.iid);
|
||||
|
||||
stmt.execute(rusqlite::params![
|
||||
event.id,
|
||||
project_id,
|
||||
issue_id,
|
||||
merge_request_id,
|
||||
event.state,
|
||||
actor_id,
|
||||
actor_username,
|
||||
created_at,
|
||||
event.source_commit,
|
||||
source_mr_iid,
|
||||
])?;
|
||||
count += 1;
|
||||
}
|
||||
|
||||
drop(stmt);
|
||||
sp.commit()?;
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Upsert label events for an entity.
|
||||
pub fn upsert_label_events(
|
||||
conn: &mut Connection,
|
||||
project_id: i64,
|
||||
entity_type: &str,
|
||||
entity_local_id: i64,
|
||||
events: &[GitLabLabelEvent],
|
||||
) -> Result<usize> {
|
||||
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
||||
|
||||
let sp = conn.savepoint()?;
|
||||
|
||||
let mut stmt = sp.prepare_cached(
|
||||
"INSERT OR REPLACE INTO resource_label_events
|
||||
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
||||
label_name, actor_gitlab_id, actor_username, created_at)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9)",
|
||||
)?;
|
||||
|
||||
let mut count = 0;
|
||||
for event in events {
|
||||
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
|
||||
let actor_id = event.user.as_ref().map(|u| u.id);
|
||||
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
|
||||
|
||||
stmt.execute(rusqlite::params![
|
||||
event.id,
|
||||
project_id,
|
||||
issue_id,
|
||||
merge_request_id,
|
||||
event.action,
|
||||
event.label.name,
|
||||
actor_id,
|
||||
actor_username,
|
||||
created_at,
|
||||
])?;
|
||||
count += 1;
|
||||
}
|
||||
|
||||
drop(stmt);
|
||||
sp.commit()?;
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Upsert milestone events for an entity.
|
||||
pub fn upsert_milestone_events(
|
||||
conn: &mut Connection,
|
||||
project_id: i64,
|
||||
entity_type: &str,
|
||||
entity_local_id: i64,
|
||||
events: &[GitLabMilestoneEvent],
|
||||
) -> Result<usize> {
|
||||
let (issue_id, merge_request_id) = resolve_entity_ids(entity_type, entity_local_id)?;
|
||||
|
||||
let sp = conn.savepoint()?;
|
||||
|
||||
let mut stmt = sp.prepare_cached(
|
||||
"INSERT OR REPLACE INTO resource_milestone_events
|
||||
(gitlab_id, project_id, issue_id, merge_request_id, action,
|
||||
milestone_title, milestone_id, actor_gitlab_id, actor_username, created_at)
|
||||
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)",
|
||||
)?;
|
||||
|
||||
let mut count = 0;
|
||||
for event in events {
|
||||
let created_at = iso_to_ms_strict(&event.created_at).map_err(LoreError::Other)?;
|
||||
let actor_id = event.user.as_ref().map(|u| u.id);
|
||||
let actor_username = event.user.as_ref().map(|u| u.username.as_str());
|
||||
|
||||
stmt.execute(rusqlite::params![
|
||||
event.id,
|
||||
project_id,
|
||||
issue_id,
|
||||
merge_request_id,
|
||||
event.action,
|
||||
event.milestone.title,
|
||||
event.milestone.id,
|
||||
actor_id,
|
||||
actor_username,
|
||||
created_at,
|
||||
])?;
|
||||
count += 1;
|
||||
}
|
||||
|
||||
drop(stmt);
|
||||
sp.commit()?;
|
||||
|
||||
Ok(count)
|
||||
}
|
||||
|
||||
/// Resolve entity type string to (issue_id, merge_request_id) pair.
|
||||
/// Exactly one is Some, the other is None.
|
||||
fn resolve_entity_ids(entity_type: &str, entity_local_id: i64) -> Result<(Option<i64>, Option<i64>)> {
|
||||
match entity_type {
|
||||
"issue" => Ok((Some(entity_local_id), None)),
|
||||
"merge_request" => Ok((None, Some(entity_local_id))),
|
||||
_ => Err(LoreError::Other(format!(
|
||||
"Invalid entity type for resource events: {entity_type}"
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
/// Count resource events by type for the count command.
|
||||
pub fn count_events(conn: &Connection) -> Result<EventCounts> {
|
||||
let mut counts = EventCounts::default();
|
||||
|
||||
// State events
|
||||
let row: (i64, i64) = conn
|
||||
.query_row(
|
||||
"SELECT
|
||||
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
|
||||
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
|
||||
FROM resource_state_events",
|
||||
[],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
)
|
||||
.unwrap_or((0, 0));
|
||||
counts.state_issue = row.0 as usize;
|
||||
counts.state_mr = row.1 as usize;
|
||||
|
||||
// Label events
|
||||
let row: (i64, i64) = conn
|
||||
.query_row(
|
||||
"SELECT
|
||||
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
|
||||
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
|
||||
FROM resource_label_events",
|
||||
[],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
)
|
||||
.unwrap_or((0, 0));
|
||||
counts.label_issue = row.0 as usize;
|
||||
counts.label_mr = row.1 as usize;
|
||||
|
||||
// Milestone events
|
||||
let row: (i64, i64) = conn
|
||||
.query_row(
|
||||
"SELECT
|
||||
COUNT(CASE WHEN issue_id IS NOT NULL THEN 1 END),
|
||||
COUNT(CASE WHEN merge_request_id IS NOT NULL THEN 1 END)
|
||||
FROM resource_milestone_events",
|
||||
[],
|
||||
|row| Ok((row.get(0)?, row.get(1)?)),
|
||||
)
|
||||
.unwrap_or((0, 0));
|
||||
counts.milestone_issue = row.0 as usize;
|
||||
counts.milestone_mr = row.1 as usize;
|
||||
|
||||
Ok(counts)
|
||||
}
|
||||
|
||||
/// Event counts broken down by type and entity.
|
||||
#[derive(Debug, Default)]
|
||||
pub struct EventCounts {
|
||||
pub state_issue: usize,
|
||||
pub state_mr: usize,
|
||||
pub label_issue: usize,
|
||||
pub label_mr: usize,
|
||||
pub milestone_issue: usize,
|
||||
pub milestone_mr: usize,
|
||||
}
|
||||
|
||||
impl EventCounts {
|
||||
pub fn total(&self) -> usize {
|
||||
self.state_issue
|
||||
+ self.state_mr
|
||||
+ self.label_issue
|
||||
+ self.label_mr
|
||||
+ self.milestone_issue
|
||||
+ self.milestone_mr
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user