4 Commits

Author SHA1 Message Date
Taylor Eernisse
5786d7f4b6 fix: defensive hardening — lock release logging, SQLite param guard, vector cast
Three defensive improvements found via peer code review:

1. lock.rs: Lock release errors were silently discarded with `let _ =`.
   If the DELETE failed (disk full, corruption), the lock stayed in the
   database with no diagnostic. Next sync would require --force with no
   clue why. Now logs with error!() including the underlying error message.

2. filters.rs: Dynamic SQL label filter construction had no upper bound
   on bind parameters. With many combined filters, param_idx + labels.len()
   could exceed SQLite's 999-parameter limit, producing an opaque error.
   Added a guard that caps labels at 900 - param_idx.

3. vector.rs: max_chunks_per_document returned i64 which was cast to
   usize. A negative value from a corrupt database would wrap to a huge
   number, causing overflow in the multiplier calculation. Now clamped
   to .max(1) and cast via unsigned_abs().

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 07:55:54 -05:00
Taylor Eernisse
d3306114eb fix(ingestion): pass ShutdownSignal into issue and MR pagination loops
The orchestrator already accepted a ShutdownSignal but only checked it
between phases (after all issues fetched, before discussions). The inner
loops in ingest_issues() and ingest_merge_requests() consumed entire
paginated streams without checking for cancellation.

On a large initial sync (thousands of issues/MRs), Ctrl+C could be
unresponsive for minutes while the current entity type finished draining.

Now both functions accept &ShutdownSignal and check is_cancelled() at
the top of each iteration, breaking out promptly and committing the
cursor for whatever was already processed.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 07:55:36 -05:00
Taylor Eernisse
e6b880cbcb fix: prevent panics in robot-mode JSON output and arithmetic paths
Peer code review found multiple panic-reachable paths:

1. serde_json::to_string().unwrap() in 4 robot-mode output functions
   (who.rs, main.rs x3). If serialization ever failed (e.g., NaN from
   edge-case division), the CLI would panic with an unhelpful stack trace.
   Replaced with unwrap_or_else that emits a structured JSON error fallback.

2. encode_rowid() in chunk_ids.rs used unchecked multiplication
   (document_id * 1000). On extreme document IDs this could silently wrap
   in release mode, causing embedding rowid collisions. Now uses
   checked_mul + checked_add with a diagnostic panic message.

3. HTTP response body truncation at byte index 500 in client.rs could
   split a multi-byte UTF-8 character, causing a panic. Now uses
   floor_char_boundary(500) for safe truncation.

4. who.rs reviews mode: SQL used `m.author_username != ?1` which silently
   dropped MRs with NULL author_username (SQL NULL != anything = NULL).
   Changed to `(m.author_username IS NULL OR m.author_username != ?1)`
   to match the pattern already used in expert mode.

5. handle_auth_test hardcoded exit code 5 for all errors regardless of
   type. Config not found (20), token not set (4), and network errors (8)
   all incorrectly returned 5. Now uses e.exit_code() from the actual
   LoreError, with proper suggestion hints in human mode.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 07:55:20 -05:00
Taylor Eernisse
121a634653 fix: critical data integrity — timeline dedup, discussion atomicity, index collision
Three correctness bugs found via peer code review:

1. TimelineEvent PartialEq/Ord omitted entity_type — issue #42 and MR #42
   with the same timestamp and event_type were treated as equal. In a
   BTreeSet or dedup, one would silently be dropped. Added entity_type to
   both PartialEq and Ord comparisons.

2. discussions.rs: store_payload() was called outside the transaction
   (on bare conn) while upsert_discussion/notes were inside. A crash
   between them left orphaned payload rows. Moved store_payload inside
   the unchecked_transaction block, matching mr_discussions.rs pattern.

3. Migration 017 created idx_issue_assignees_username(username, issue_id)
   but migration 005 already created the same index name with just
   (username). SQLite's IF NOT EXISTS silently skipped the composite
   version on every existing database. New migration 018 drops and
   recreates the index with correct composite columns.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-08 07:54:59 -05:00
14 changed files with 116 additions and 41 deletions

View File

@@ -0,0 +1,10 @@
-- Migration 018: Fix composite index on issue_assignees
-- Migration 005 created idx_issue_assignees_username(username) as single-column.
-- Migration 017 attempted to recreate as (username, issue_id) but IF NOT EXISTS
-- silently skipped it. Drop and recreate with the correct composite columns.
DROP INDEX IF EXISTS idx_issue_assignees_username;
CREATE INDEX idx_issue_assignees_username ON issue_assignees(username, issue_id);
INSERT INTO schema_version (version, applied_at, description)
VALUES (18, strftime('%s', 'now') * 1000, 'Fix composite index on issue_assignees');

View File

@@ -851,7 +851,7 @@ fn query_reviews(
WHERE n.author_username = ?1
AND n.note_type = 'DiffNote'
AND n.is_system = 0
AND m.author_username != ?1
AND (m.author_username IS NULL OR m.author_username != ?1)
AND n.created_at >= ?2
AND (?3 IS NULL OR n.project_id = ?3)";
@@ -868,7 +868,7 @@ fn query_reviews(
WHERE n.author_username = ?1
AND n.note_type = 'DiffNote'
AND n.is_system = 0
AND m.author_username != ?1
AND (m.author_username IS NULL OR m.author_username != ?1)
AND n.created_at >= ?2
AND (?3 IS NULL OR n.project_id = ?3)";
@@ -888,7 +888,7 @@ fn query_reviews(
WHERE n.author_username = ?1
AND n.note_type = 'DiffNote'
AND n.is_system = 0
AND m.author_username != ?1
AND (m.author_username IS NULL OR m.author_username != ?1)
AND ltrim(n.body) LIKE '**%**%'
AND n.created_at >= ?2
AND (?3 IS NULL OR n.project_id = ?3)
@@ -1798,7 +1798,12 @@ pub fn print_who_json(run: &WhoRun, args: &WhoArgs, elapsed_ms: u64) {
meta: RobotMeta { elapsed_ms },
};
println!("{}", serde_json::to_string(&output).unwrap());
println!(
"{}",
serde_json::to_string(&output).unwrap_or_else(|e| {
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
})
);
}
#[derive(Serialize)]

View File

@@ -53,6 +53,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
include_str!("../../migrations/016_mr_file_changes.sql"),
),
("017", include_str!("../../migrations/017_who_indexes.sql")),
(
"018",
include_str!("../../migrations/018_fix_assignees_composite_index.sql"),
),
];
pub fn create_connection(db_path: &Path) -> Result<Connection> {

View File

@@ -121,12 +121,17 @@ impl AppLock {
let _ = handle.join();
}
let _ = self.conn.execute(
match self.conn.execute(
"DELETE FROM app_locks WHERE name = ? AND owner = ?",
(&self.name, &self.owner),
);
info!(owner = %self.owner, "Lock released");
) {
Ok(_) => info!(owner = %self.owner, "Lock released"),
Err(e) => error!(
owner = %self.owner,
error = %e,
"Failed to release lock; may require --force on next sync"
),
}
}
fn start_heartbeat(&mut self) {

View File

@@ -25,6 +25,7 @@ pub struct TimelineEvent {
impl PartialEq for TimelineEvent {
fn eq(&self, other: &Self) -> bool {
self.timestamp == other.timestamp
&& self.entity_type == other.entity_type
&& self.entity_id == other.entity_id
&& self.event_type == other.event_type
}
@@ -42,6 +43,7 @@ impl Ord for TimelineEvent {
fn cmp(&self, other: &Self) -> Ordering {
self.timestamp
.cmp(&other.timestamp)
.then_with(|| self.entity_type.cmp(&other.entity_type))
.then_with(|| self.entity_id.cmp(&other.entity_id))
.then_with(|| self.event_type.cmp(&other.event_type))
}

View File

@@ -1,7 +1,12 @@
pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000;
pub fn encode_rowid(document_id: i64, chunk_index: i64) -> i64 {
document_id * CHUNK_ROWID_MULTIPLIER + chunk_index
document_id
.checked_mul(CHUNK_ROWID_MULTIPLIER)
.and_then(|v| v.checked_add(chunk_index))
.unwrap_or_else(|| {
panic!("encode_rowid overflow: document_id={document_id}, chunk_index={chunk_index}")
})
}
pub fn decode_rowid(rowid: i64) -> (i64, i64) {

View File

@@ -70,15 +70,19 @@ impl GitLabClient {
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
let client = Client::builder()
.default_headers(headers)
.default_headers(headers.clone())
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_else(|e| {
warn!(
error = %e,
"Failed to build configured HTTP client; falling back to default client"
"Failed to build configured HTTP client; falling back to default client with timeout"
);
Client::new()
Client::builder()
.default_headers(headers)
.timeout(Duration::from_secs(30))
.build()
.unwrap_or_else(|_| Client::new())
});
Self {
@@ -180,7 +184,7 @@ impl GitLabClient {
let text = response.text().await?;
serde_json::from_str(&text).map_err(|e| {
let preview = if text.len() > 500 {
&text[..500]
&text[..text.floor_char_boundary(500)]
} else {
&text
};

View File

@@ -96,16 +96,6 @@ async fn ingest_discussions_for_issue(
result.discussions_fetched += 1;
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?;
let payload_id = store_payload(
conn,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "discussion",
gitlab_id: &gitlab_discussion.id,
json_bytes: &payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?;
let normalized = transform_discussion(
&gitlab_discussion,
@@ -115,6 +105,17 @@ async fn ingest_discussions_for_issue(
let tx = conn.unchecked_transaction()?;
let payload_id = store_payload(
&tx,
StorePayloadOptions {
project_id: Some(local_project_id),
resource_type: "discussion",
gitlab_id: &gitlab_discussion.id,
json_bytes: &payload_bytes,
compress: config.storage.compress_raw_payloads,
},
)?;
upsert_discussion(&tx, &normalized, payload_id)?;
let local_discussion_id: i64 = tx.query_row(

View File

@@ -7,6 +7,7 @@ use tracing::{debug, info, warn};
use crate::Config;
use crate::core::error::{LoreError, Result};
use crate::core::payloads::{StorePayloadOptions, store_payload};
use crate::core::shutdown::ShutdownSignal;
use crate::core::time::now_ms;
use crate::documents::SourceType;
use crate::gitlab::GitLabClient;
@@ -41,6 +42,7 @@ pub async fn ingest_issues(
config: &Config,
project_id: i64,
gitlab_project_id: i64,
signal: &ShutdownSignal,
) -> Result<IngestIssuesResult> {
let mut result = IngestIssuesResult::default();
@@ -58,6 +60,10 @@ pub async fn ingest_issues(
let mut last_gitlab_id: Option<i64> = None;
while let Some(issue_result) = issues_stream.next().await {
if signal.is_cancelled() {
info!("Issue ingestion interrupted by shutdown signal");
break;
}
let issue = issue_result?;
result.fetched += 1;

View File

@@ -6,6 +6,7 @@ use tracing::{debug, info, warn};
use crate::Config;
use crate::core::error::{LoreError, Result};
use crate::core::payloads::{StorePayloadOptions, store_payload};
use crate::core::shutdown::ShutdownSignal;
use crate::core::time::now_ms;
use crate::documents::SourceType;
use crate::gitlab::GitLabClient;
@@ -42,6 +43,7 @@ pub async fn ingest_merge_requests(
project_id: i64,
gitlab_project_id: i64,
full_sync: bool,
signal: &ShutdownSignal,
) -> Result<IngestMergeRequestsResult> {
let mut result = IngestMergeRequestsResult::default();
@@ -58,6 +60,10 @@ pub async fn ingest_merge_requests(
let per_page = 100u32;
loop {
if signal.is_cancelled() {
info!("MR ingestion interrupted by shutdown signal");
break;
}
let page_result = client
.fetch_merge_requests_page(
gitlab_project_id,

View File

@@ -119,7 +119,8 @@ pub async fn ingest_project_issues_with_progress(
};
emit(ProgressEvent::IssuesFetchStarted);
let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?;
let issue_result =
ingest_issues(conn, client, config, project_id, gitlab_project_id, signal).await?;
result.issues_fetched = issue_result.fetched;
result.issues_upserted = issue_result.upserted;
@@ -329,6 +330,7 @@ pub async fn ingest_project_merge_requests_with_progress(
project_id,
gitlab_project_id,
full_sync,
signal,
)
.await?;

View File

@@ -845,7 +845,12 @@ fn print_combined_ingest_json(
meta: RobotMeta { elapsed_ms },
};
println!("{}", serde_json::to_string(&output).unwrap());
println!(
"{}",
serde_json::to_string(&output).unwrap_or_else(|e| {
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
})
);
}
#[derive(Serialize)]
@@ -874,7 +879,12 @@ fn print_combined_dry_run_json(
},
};
println!("{}", serde_json::to_string(&output).unwrap());
println!(
"{}",
serde_json::to_string(&output).unwrap_or_else(|e| {
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
})
);
}
async fn handle_count(
@@ -966,7 +976,12 @@ fn print_init_json(result: &InitResult) {
.collect(),
},
};
println!("{}", serde_json::to_string(&output).unwrap());
println!(
"{}",
serde_json::to_string(&output).unwrap_or_else(|e| {
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
})
);
}
async fn handle_init(
@@ -1198,17 +1213,20 @@ async fn handle_auth_test(
}
Err(e) => {
if robot_mode {
let output = FallbackErrorOutput {
error: FallbackError {
code: "AUTH_FAILED".to_string(),
message: e.to_string(),
},
};
eprintln!("{}", serde_json::to_string(&output)?);
let output = RobotErrorOutput::from(&e);
eprintln!(
"{}",
serde_json::to_string(&output).unwrap_or_else(|_| {
format!(r#"{{"error":{{"code":"{}","message":"{}"}}}}"#, e.code(), e)
})
);
} else {
eprintln!("{}", style(format!("Error: {e}")).red());
eprintln!("{} {}", style("Error:").red(), e);
if let Some(suggestion) = e.suggestion() {
eprintln!("{} {}", style("Hint:").yellow(), suggestion);
}
}
std::process::exit(5);
std::process::exit(e.exit_code());
}
}
}

View File

@@ -98,15 +98,22 @@ pub fn apply_filters(
}
if !filters.labels.is_empty() {
let placeholders: Vec<String> = (0..filters.labels.len())
// SQLite has a default limit of 999 bind parameters.
let max_labels = 900_usize.saturating_sub(param_idx);
let label_slice = if filters.labels.len() > max_labels {
&filters.labels[..max_labels]
} else {
&filters.labels
};
let placeholders: Vec<String> = (0..label_slice.len())
.map(|i| format!("?{}", param_idx + i))
.collect();
sql.push_str(&format!(
" AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name IN ({}) GROUP BY dl.document_id HAVING COUNT(DISTINCT dl.label_name) = {})",
placeholders.join(","),
filters.labels.len()
label_slice.len()
));
for label in &filters.labels {
for label in label_slice {
params.push(Box::new(label.clone()));
param_idx += 1;
}

View File

@@ -50,8 +50,8 @@ pub fn search_vector(
.flat_map(|f| f.to_le_bytes())
.collect();
let max_chunks = max_chunks_per_document(conn);
let multiplier = ((max_chunks as usize * 3 / 2) + 1).max(8);
let max_chunks = max_chunks_per_document(conn).max(1);
let multiplier = ((max_chunks.unsigned_abs() as usize * 3 / 2) + 1).max(8);
let k = limit * multiplier;
let mut stmt = conn.prepare(