Compare commits
4 Commits
f267578aab
...
5786d7f4b6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5786d7f4b6 | ||
|
|
d3306114eb | ||
|
|
e6b880cbcb | ||
|
|
121a634653 |
10
migrations/018_fix_assignees_composite_index.sql
Normal file
10
migrations/018_fix_assignees_composite_index.sql
Normal file
@@ -0,0 +1,10 @@
|
||||
-- Migration 018: Fix composite index on issue_assignees
|
||||
-- Migration 005 created idx_issue_assignees_username(username) as single-column.
|
||||
-- Migration 017 attempted to recreate as (username, issue_id) but IF NOT EXISTS
|
||||
-- silently skipped it. Drop and recreate with the correct composite columns.
|
||||
|
||||
DROP INDEX IF EXISTS idx_issue_assignees_username;
|
||||
CREATE INDEX idx_issue_assignees_username ON issue_assignees(username, issue_id);
|
||||
|
||||
INSERT INTO schema_version (version, applied_at, description)
|
||||
VALUES (18, strftime('%s', 'now') * 1000, 'Fix composite index on issue_assignees');
|
||||
@@ -851,7 +851,7 @@ fn query_reviews(
|
||||
WHERE n.author_username = ?1
|
||||
AND n.note_type = 'DiffNote'
|
||||
AND n.is_system = 0
|
||||
AND m.author_username != ?1
|
||||
AND (m.author_username IS NULL OR m.author_username != ?1)
|
||||
AND n.created_at >= ?2
|
||||
AND (?3 IS NULL OR n.project_id = ?3)";
|
||||
|
||||
@@ -868,7 +868,7 @@ fn query_reviews(
|
||||
WHERE n.author_username = ?1
|
||||
AND n.note_type = 'DiffNote'
|
||||
AND n.is_system = 0
|
||||
AND m.author_username != ?1
|
||||
AND (m.author_username IS NULL OR m.author_username != ?1)
|
||||
AND n.created_at >= ?2
|
||||
AND (?3 IS NULL OR n.project_id = ?3)";
|
||||
|
||||
@@ -888,7 +888,7 @@ fn query_reviews(
|
||||
WHERE n.author_username = ?1
|
||||
AND n.note_type = 'DiffNote'
|
||||
AND n.is_system = 0
|
||||
AND m.author_username != ?1
|
||||
AND (m.author_username IS NULL OR m.author_username != ?1)
|
||||
AND ltrim(n.body) LIKE '**%**%'
|
||||
AND n.created_at >= ?2
|
||||
AND (?3 IS NULL OR n.project_id = ?3)
|
||||
@@ -1798,7 +1798,12 @@ pub fn print_who_json(run: &WhoRun, args: &WhoArgs, elapsed_ms: u64) {
|
||||
meta: RobotMeta { elapsed_ms },
|
||||
};
|
||||
|
||||
println!("{}", serde_json::to_string(&output).unwrap());
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
||||
@@ -53,6 +53,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
|
||||
include_str!("../../migrations/016_mr_file_changes.sql"),
|
||||
),
|
||||
("017", include_str!("../../migrations/017_who_indexes.sql")),
|
||||
(
|
||||
"018",
|
||||
include_str!("../../migrations/018_fix_assignees_composite_index.sql"),
|
||||
),
|
||||
];
|
||||
|
||||
pub fn create_connection(db_path: &Path) -> Result<Connection> {
|
||||
|
||||
@@ -121,12 +121,17 @@ impl AppLock {
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
let _ = self.conn.execute(
|
||||
match self.conn.execute(
|
||||
"DELETE FROM app_locks WHERE name = ? AND owner = ?",
|
||||
(&self.name, &self.owner),
|
||||
);
|
||||
|
||||
info!(owner = %self.owner, "Lock released");
|
||||
) {
|
||||
Ok(_) => info!(owner = %self.owner, "Lock released"),
|
||||
Err(e) => error!(
|
||||
owner = %self.owner,
|
||||
error = %e,
|
||||
"Failed to release lock; may require --force on next sync"
|
||||
),
|
||||
}
|
||||
}
|
||||
|
||||
fn start_heartbeat(&mut self) {
|
||||
|
||||
@@ -25,6 +25,7 @@ pub struct TimelineEvent {
|
||||
impl PartialEq for TimelineEvent {
|
||||
fn eq(&self, other: &Self) -> bool {
|
||||
self.timestamp == other.timestamp
|
||||
&& self.entity_type == other.entity_type
|
||||
&& self.entity_id == other.entity_id
|
||||
&& self.event_type == other.event_type
|
||||
}
|
||||
@@ -42,6 +43,7 @@ impl Ord for TimelineEvent {
|
||||
fn cmp(&self, other: &Self) -> Ordering {
|
||||
self.timestamp
|
||||
.cmp(&other.timestamp)
|
||||
.then_with(|| self.entity_type.cmp(&other.entity_type))
|
||||
.then_with(|| self.entity_id.cmp(&other.entity_id))
|
||||
.then_with(|| self.event_type.cmp(&other.event_type))
|
||||
}
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000;
|
||||
|
||||
pub fn encode_rowid(document_id: i64, chunk_index: i64) -> i64 {
|
||||
document_id * CHUNK_ROWID_MULTIPLIER + chunk_index
|
||||
document_id
|
||||
.checked_mul(CHUNK_ROWID_MULTIPLIER)
|
||||
.and_then(|v| v.checked_add(chunk_index))
|
||||
.unwrap_or_else(|| {
|
||||
panic!("encode_rowid overflow: document_id={document_id}, chunk_index={chunk_index}")
|
||||
})
|
||||
}
|
||||
|
||||
pub fn decode_rowid(rowid: i64) -> (i64, i64) {
|
||||
|
||||
@@ -70,15 +70,19 @@ impl GitLabClient {
|
||||
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
|
||||
|
||||
let client = Client::builder()
|
||||
.default_headers(headers)
|
||||
.default_headers(headers.clone())
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap_or_else(|e| {
|
||||
warn!(
|
||||
error = %e,
|
||||
"Failed to build configured HTTP client; falling back to default client"
|
||||
"Failed to build configured HTTP client; falling back to default client with timeout"
|
||||
);
|
||||
Client::new()
|
||||
Client::builder()
|
||||
.default_headers(headers)
|
||||
.timeout(Duration::from_secs(30))
|
||||
.build()
|
||||
.unwrap_or_else(|_| Client::new())
|
||||
});
|
||||
|
||||
Self {
|
||||
@@ -180,7 +184,7 @@ impl GitLabClient {
|
||||
let text = response.text().await?;
|
||||
serde_json::from_str(&text).map_err(|e| {
|
||||
let preview = if text.len() > 500 {
|
||||
&text[..500]
|
||||
&text[..text.floor_char_boundary(500)]
|
||||
} else {
|
||||
&text
|
||||
};
|
||||
|
||||
@@ -96,16 +96,6 @@ async fn ingest_discussions_for_issue(
|
||||
result.discussions_fetched += 1;
|
||||
|
||||
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?;
|
||||
let payload_id = store_payload(
|
||||
conn,
|
||||
StorePayloadOptions {
|
||||
project_id: Some(local_project_id),
|
||||
resource_type: "discussion",
|
||||
gitlab_id: &gitlab_discussion.id,
|
||||
json_bytes: &payload_bytes,
|
||||
compress: config.storage.compress_raw_payloads,
|
||||
},
|
||||
)?;
|
||||
|
||||
let normalized = transform_discussion(
|
||||
&gitlab_discussion,
|
||||
@@ -115,6 +105,17 @@ async fn ingest_discussions_for_issue(
|
||||
|
||||
let tx = conn.unchecked_transaction()?;
|
||||
|
||||
let payload_id = store_payload(
|
||||
&tx,
|
||||
StorePayloadOptions {
|
||||
project_id: Some(local_project_id),
|
||||
resource_type: "discussion",
|
||||
gitlab_id: &gitlab_discussion.id,
|
||||
json_bytes: &payload_bytes,
|
||||
compress: config.storage.compress_raw_payloads,
|
||||
},
|
||||
)?;
|
||||
|
||||
upsert_discussion(&tx, &normalized, payload_id)?;
|
||||
|
||||
let local_discussion_id: i64 = tx.query_row(
|
||||
|
||||
@@ -7,6 +7,7 @@ use tracing::{debug, info, warn};
|
||||
use crate::Config;
|
||||
use crate::core::error::{LoreError, Result};
|
||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||
use crate::core::shutdown::ShutdownSignal;
|
||||
use crate::core::time::now_ms;
|
||||
use crate::documents::SourceType;
|
||||
use crate::gitlab::GitLabClient;
|
||||
@@ -41,6 +42,7 @@ pub async fn ingest_issues(
|
||||
config: &Config,
|
||||
project_id: i64,
|
||||
gitlab_project_id: i64,
|
||||
signal: &ShutdownSignal,
|
||||
) -> Result<IngestIssuesResult> {
|
||||
let mut result = IngestIssuesResult::default();
|
||||
|
||||
@@ -58,6 +60,10 @@ pub async fn ingest_issues(
|
||||
let mut last_gitlab_id: Option<i64> = None;
|
||||
|
||||
while let Some(issue_result) = issues_stream.next().await {
|
||||
if signal.is_cancelled() {
|
||||
info!("Issue ingestion interrupted by shutdown signal");
|
||||
break;
|
||||
}
|
||||
let issue = issue_result?;
|
||||
result.fetched += 1;
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ use tracing::{debug, info, warn};
|
||||
use crate::Config;
|
||||
use crate::core::error::{LoreError, Result};
|
||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||
use crate::core::shutdown::ShutdownSignal;
|
||||
use crate::core::time::now_ms;
|
||||
use crate::documents::SourceType;
|
||||
use crate::gitlab::GitLabClient;
|
||||
@@ -42,6 +43,7 @@ pub async fn ingest_merge_requests(
|
||||
project_id: i64,
|
||||
gitlab_project_id: i64,
|
||||
full_sync: bool,
|
||||
signal: &ShutdownSignal,
|
||||
) -> Result<IngestMergeRequestsResult> {
|
||||
let mut result = IngestMergeRequestsResult::default();
|
||||
|
||||
@@ -58,6 +60,10 @@ pub async fn ingest_merge_requests(
|
||||
let per_page = 100u32;
|
||||
|
||||
loop {
|
||||
if signal.is_cancelled() {
|
||||
info!("MR ingestion interrupted by shutdown signal");
|
||||
break;
|
||||
}
|
||||
let page_result = client
|
||||
.fetch_merge_requests_page(
|
||||
gitlab_project_id,
|
||||
|
||||
@@ -119,7 +119,8 @@ pub async fn ingest_project_issues_with_progress(
|
||||
};
|
||||
|
||||
emit(ProgressEvent::IssuesFetchStarted);
|
||||
let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?;
|
||||
let issue_result =
|
||||
ingest_issues(conn, client, config, project_id, gitlab_project_id, signal).await?;
|
||||
|
||||
result.issues_fetched = issue_result.fetched;
|
||||
result.issues_upserted = issue_result.upserted;
|
||||
@@ -329,6 +330,7 @@ pub async fn ingest_project_merge_requests_with_progress(
|
||||
project_id,
|
||||
gitlab_project_id,
|
||||
full_sync,
|
||||
signal,
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
||||
42
src/main.rs
42
src/main.rs
@@ -845,7 +845,12 @@ fn print_combined_ingest_json(
|
||||
meta: RobotMeta { elapsed_ms },
|
||||
};
|
||||
|
||||
println!("{}", serde_json::to_string(&output).unwrap());
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
@@ -874,7 +879,12 @@ fn print_combined_dry_run_json(
|
||||
},
|
||||
};
|
||||
|
||||
println!("{}", serde_json::to_string(&output).unwrap());
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
async fn handle_count(
|
||||
@@ -966,7 +976,12 @@ fn print_init_json(result: &InitResult) {
|
||||
.collect(),
|
||||
},
|
||||
};
|
||||
println!("{}", serde_json::to_string(&output).unwrap());
|
||||
println!(
|
||||
"{}",
|
||||
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||
})
|
||||
);
|
||||
}
|
||||
|
||||
async fn handle_init(
|
||||
@@ -1198,17 +1213,20 @@ async fn handle_auth_test(
|
||||
}
|
||||
Err(e) => {
|
||||
if robot_mode {
|
||||
let output = FallbackErrorOutput {
|
||||
error: FallbackError {
|
||||
code: "AUTH_FAILED".to_string(),
|
||||
message: e.to_string(),
|
||||
},
|
||||
};
|
||||
eprintln!("{}", serde_json::to_string(&output)?);
|
||||
let output = RobotErrorOutput::from(&e);
|
||||
eprintln!(
|
||||
"{}",
|
||||
serde_json::to_string(&output).unwrap_or_else(|_| {
|
||||
format!(r#"{{"error":{{"code":"{}","message":"{}"}}}}"#, e.code(), e)
|
||||
})
|
||||
);
|
||||
} else {
|
||||
eprintln!("{}", style(format!("Error: {e}")).red());
|
||||
eprintln!("{} {}", style("Error:").red(), e);
|
||||
if let Some(suggestion) = e.suggestion() {
|
||||
eprintln!("{} {}", style("Hint:").yellow(), suggestion);
|
||||
}
|
||||
std::process::exit(5);
|
||||
}
|
||||
std::process::exit(e.exit_code());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -98,15 +98,22 @@ pub fn apply_filters(
|
||||
}
|
||||
|
||||
if !filters.labels.is_empty() {
|
||||
let placeholders: Vec<String> = (0..filters.labels.len())
|
||||
// SQLite has a default limit of 999 bind parameters.
|
||||
let max_labels = 900_usize.saturating_sub(param_idx);
|
||||
let label_slice = if filters.labels.len() > max_labels {
|
||||
&filters.labels[..max_labels]
|
||||
} else {
|
||||
&filters.labels
|
||||
};
|
||||
let placeholders: Vec<String> = (0..label_slice.len())
|
||||
.map(|i| format!("?{}", param_idx + i))
|
||||
.collect();
|
||||
sql.push_str(&format!(
|
||||
" AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name IN ({}) GROUP BY dl.document_id HAVING COUNT(DISTINCT dl.label_name) = {})",
|
||||
placeholders.join(","),
|
||||
filters.labels.len()
|
||||
label_slice.len()
|
||||
));
|
||||
for label in &filters.labels {
|
||||
for label in label_slice {
|
||||
params.push(Box::new(label.clone()));
|
||||
param_idx += 1;
|
||||
}
|
||||
|
||||
@@ -50,8 +50,8 @@ pub fn search_vector(
|
||||
.flat_map(|f| f.to_le_bytes())
|
||||
.collect();
|
||||
|
||||
let max_chunks = max_chunks_per_document(conn);
|
||||
let multiplier = ((max_chunks as usize * 3 / 2) + 1).max(8);
|
||||
let max_chunks = max_chunks_per_document(conn).max(1);
|
||||
let multiplier = ((max_chunks.unsigned_abs() as usize * 3 / 2) + 1).max(8);
|
||||
let k = limit * multiplier;
|
||||
|
||||
let mut stmt = conn.prepare(
|
||||
|
||||
Reference in New Issue
Block a user