Compare commits
4 Commits
f267578aab
...
5786d7f4b6
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5786d7f4b6 | ||
|
|
d3306114eb | ||
|
|
e6b880cbcb | ||
|
|
121a634653 |
10
migrations/018_fix_assignees_composite_index.sql
Normal file
10
migrations/018_fix_assignees_composite_index.sql
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
-- Migration 018: Fix composite index on issue_assignees
|
||||||
|
-- Migration 005 created idx_issue_assignees_username(username) as single-column.
|
||||||
|
-- Migration 017 attempted to recreate as (username, issue_id) but IF NOT EXISTS
|
||||||
|
-- silently skipped it. Drop and recreate with the correct composite columns.
|
||||||
|
|
||||||
|
DROP INDEX IF EXISTS idx_issue_assignees_username;
|
||||||
|
CREATE INDEX idx_issue_assignees_username ON issue_assignees(username, issue_id);
|
||||||
|
|
||||||
|
INSERT INTO schema_version (version, applied_at, description)
|
||||||
|
VALUES (18, strftime('%s', 'now') * 1000, 'Fix composite index on issue_assignees');
|
||||||
@@ -851,7 +851,7 @@ fn query_reviews(
|
|||||||
WHERE n.author_username = ?1
|
WHERE n.author_username = ?1
|
||||||
AND n.note_type = 'DiffNote'
|
AND n.note_type = 'DiffNote'
|
||||||
AND n.is_system = 0
|
AND n.is_system = 0
|
||||||
AND m.author_username != ?1
|
AND (m.author_username IS NULL OR m.author_username != ?1)
|
||||||
AND n.created_at >= ?2
|
AND n.created_at >= ?2
|
||||||
AND (?3 IS NULL OR n.project_id = ?3)";
|
AND (?3 IS NULL OR n.project_id = ?3)";
|
||||||
|
|
||||||
@@ -868,7 +868,7 @@ fn query_reviews(
|
|||||||
WHERE n.author_username = ?1
|
WHERE n.author_username = ?1
|
||||||
AND n.note_type = 'DiffNote'
|
AND n.note_type = 'DiffNote'
|
||||||
AND n.is_system = 0
|
AND n.is_system = 0
|
||||||
AND m.author_username != ?1
|
AND (m.author_username IS NULL OR m.author_username != ?1)
|
||||||
AND n.created_at >= ?2
|
AND n.created_at >= ?2
|
||||||
AND (?3 IS NULL OR n.project_id = ?3)";
|
AND (?3 IS NULL OR n.project_id = ?3)";
|
||||||
|
|
||||||
@@ -888,7 +888,7 @@ fn query_reviews(
|
|||||||
WHERE n.author_username = ?1
|
WHERE n.author_username = ?1
|
||||||
AND n.note_type = 'DiffNote'
|
AND n.note_type = 'DiffNote'
|
||||||
AND n.is_system = 0
|
AND n.is_system = 0
|
||||||
AND m.author_username != ?1
|
AND (m.author_username IS NULL OR m.author_username != ?1)
|
||||||
AND ltrim(n.body) LIKE '**%**%'
|
AND ltrim(n.body) LIKE '**%**%'
|
||||||
AND n.created_at >= ?2
|
AND n.created_at >= ?2
|
||||||
AND (?3 IS NULL OR n.project_id = ?3)
|
AND (?3 IS NULL OR n.project_id = ?3)
|
||||||
@@ -1798,7 +1798,12 @@ pub fn print_who_json(run: &WhoRun, args: &WhoArgs, elapsed_ms: u64) {
|
|||||||
meta: RobotMeta { elapsed_ms },
|
meta: RobotMeta { elapsed_ms },
|
||||||
};
|
};
|
||||||
|
|
||||||
println!("{}", serde_json::to_string(&output).unwrap());
|
println!(
|
||||||
|
"{}",
|
||||||
|
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||||
|
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
|
|||||||
@@ -53,6 +53,10 @@ const MIGRATIONS: &[(&str, &str)] = &[
|
|||||||
include_str!("../../migrations/016_mr_file_changes.sql"),
|
include_str!("../../migrations/016_mr_file_changes.sql"),
|
||||||
),
|
),
|
||||||
("017", include_str!("../../migrations/017_who_indexes.sql")),
|
("017", include_str!("../../migrations/017_who_indexes.sql")),
|
||||||
|
(
|
||||||
|
"018",
|
||||||
|
include_str!("../../migrations/018_fix_assignees_composite_index.sql"),
|
||||||
|
),
|
||||||
];
|
];
|
||||||
|
|
||||||
pub fn create_connection(db_path: &Path) -> Result<Connection> {
|
pub fn create_connection(db_path: &Path) -> Result<Connection> {
|
||||||
|
|||||||
@@ -121,12 +121,17 @@ impl AppLock {
|
|||||||
let _ = handle.join();
|
let _ = handle.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
let _ = self.conn.execute(
|
match self.conn.execute(
|
||||||
"DELETE FROM app_locks WHERE name = ? AND owner = ?",
|
"DELETE FROM app_locks WHERE name = ? AND owner = ?",
|
||||||
(&self.name, &self.owner),
|
(&self.name, &self.owner),
|
||||||
);
|
) {
|
||||||
|
Ok(_) => info!(owner = %self.owner, "Lock released"),
|
||||||
info!(owner = %self.owner, "Lock released");
|
Err(e) => error!(
|
||||||
|
owner = %self.owner,
|
||||||
|
error = %e,
|
||||||
|
"Failed to release lock; may require --force on next sync"
|
||||||
|
),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn start_heartbeat(&mut self) {
|
fn start_heartbeat(&mut self) {
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ pub struct TimelineEvent {
|
|||||||
impl PartialEq for TimelineEvent {
|
impl PartialEq for TimelineEvent {
|
||||||
fn eq(&self, other: &Self) -> bool {
|
fn eq(&self, other: &Self) -> bool {
|
||||||
self.timestamp == other.timestamp
|
self.timestamp == other.timestamp
|
||||||
|
&& self.entity_type == other.entity_type
|
||||||
&& self.entity_id == other.entity_id
|
&& self.entity_id == other.entity_id
|
||||||
&& self.event_type == other.event_type
|
&& self.event_type == other.event_type
|
||||||
}
|
}
|
||||||
@@ -42,6 +43,7 @@ impl Ord for TimelineEvent {
|
|||||||
fn cmp(&self, other: &Self) -> Ordering {
|
fn cmp(&self, other: &Self) -> Ordering {
|
||||||
self.timestamp
|
self.timestamp
|
||||||
.cmp(&other.timestamp)
|
.cmp(&other.timestamp)
|
||||||
|
.then_with(|| self.entity_type.cmp(&other.entity_type))
|
||||||
.then_with(|| self.entity_id.cmp(&other.entity_id))
|
.then_with(|| self.entity_id.cmp(&other.entity_id))
|
||||||
.then_with(|| self.event_type.cmp(&other.event_type))
|
.then_with(|| self.event_type.cmp(&other.event_type))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,7 +1,12 @@
|
|||||||
pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000;
|
pub const CHUNK_ROWID_MULTIPLIER: i64 = 1000;
|
||||||
|
|
||||||
pub fn encode_rowid(document_id: i64, chunk_index: i64) -> i64 {
|
pub fn encode_rowid(document_id: i64, chunk_index: i64) -> i64 {
|
||||||
document_id * CHUNK_ROWID_MULTIPLIER + chunk_index
|
document_id
|
||||||
|
.checked_mul(CHUNK_ROWID_MULTIPLIER)
|
||||||
|
.and_then(|v| v.checked_add(chunk_index))
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
panic!("encode_rowid overflow: document_id={document_id}, chunk_index={chunk_index}")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decode_rowid(rowid: i64) -> (i64, i64) {
|
pub fn decode_rowid(rowid: i64) -> (i64, i64) {
|
||||||
|
|||||||
@@ -70,15 +70,19 @@ impl GitLabClient {
|
|||||||
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
|
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
|
||||||
|
|
||||||
let client = Client::builder()
|
let client = Client::builder()
|
||||||
.default_headers(headers)
|
.default_headers(headers.clone())
|
||||||
.timeout(Duration::from_secs(30))
|
.timeout(Duration::from_secs(30))
|
||||||
.build()
|
.build()
|
||||||
.unwrap_or_else(|e| {
|
.unwrap_or_else(|e| {
|
||||||
warn!(
|
warn!(
|
||||||
error = %e,
|
error = %e,
|
||||||
"Failed to build configured HTTP client; falling back to default client"
|
"Failed to build configured HTTP client; falling back to default client with timeout"
|
||||||
);
|
);
|
||||||
Client::new()
|
Client::builder()
|
||||||
|
.default_headers(headers)
|
||||||
|
.timeout(Duration::from_secs(30))
|
||||||
|
.build()
|
||||||
|
.unwrap_or_else(|_| Client::new())
|
||||||
});
|
});
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
@@ -180,7 +184,7 @@ impl GitLabClient {
|
|||||||
let text = response.text().await?;
|
let text = response.text().await?;
|
||||||
serde_json::from_str(&text).map_err(|e| {
|
serde_json::from_str(&text).map_err(|e| {
|
||||||
let preview = if text.len() > 500 {
|
let preview = if text.len() > 500 {
|
||||||
&text[..500]
|
&text[..text.floor_char_boundary(500)]
|
||||||
} else {
|
} else {
|
||||||
&text
|
&text
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -96,16 +96,6 @@ async fn ingest_discussions_for_issue(
|
|||||||
result.discussions_fetched += 1;
|
result.discussions_fetched += 1;
|
||||||
|
|
||||||
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?;
|
let payload_bytes = serde_json::to_vec(&gitlab_discussion)?;
|
||||||
let payload_id = store_payload(
|
|
||||||
conn,
|
|
||||||
StorePayloadOptions {
|
|
||||||
project_id: Some(local_project_id),
|
|
||||||
resource_type: "discussion",
|
|
||||||
gitlab_id: &gitlab_discussion.id,
|
|
||||||
json_bytes: &payload_bytes,
|
|
||||||
compress: config.storage.compress_raw_payloads,
|
|
||||||
},
|
|
||||||
)?;
|
|
||||||
|
|
||||||
let normalized = transform_discussion(
|
let normalized = transform_discussion(
|
||||||
&gitlab_discussion,
|
&gitlab_discussion,
|
||||||
@@ -115,6 +105,17 @@ async fn ingest_discussions_for_issue(
|
|||||||
|
|
||||||
let tx = conn.unchecked_transaction()?;
|
let tx = conn.unchecked_transaction()?;
|
||||||
|
|
||||||
|
let payload_id = store_payload(
|
||||||
|
&tx,
|
||||||
|
StorePayloadOptions {
|
||||||
|
project_id: Some(local_project_id),
|
||||||
|
resource_type: "discussion",
|
||||||
|
gitlab_id: &gitlab_discussion.id,
|
||||||
|
json_bytes: &payload_bytes,
|
||||||
|
compress: config.storage.compress_raw_payloads,
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
upsert_discussion(&tx, &normalized, payload_id)?;
|
upsert_discussion(&tx, &normalized, payload_id)?;
|
||||||
|
|
||||||
let local_discussion_id: i64 = tx.query_row(
|
let local_discussion_id: i64 = tx.query_row(
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ use tracing::{debug, info, warn};
|
|||||||
use crate::Config;
|
use crate::Config;
|
||||||
use crate::core::error::{LoreError, Result};
|
use crate::core::error::{LoreError, Result};
|
||||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||||
|
use crate::core::shutdown::ShutdownSignal;
|
||||||
use crate::core::time::now_ms;
|
use crate::core::time::now_ms;
|
||||||
use crate::documents::SourceType;
|
use crate::documents::SourceType;
|
||||||
use crate::gitlab::GitLabClient;
|
use crate::gitlab::GitLabClient;
|
||||||
@@ -41,6 +42,7 @@ pub async fn ingest_issues(
|
|||||||
config: &Config,
|
config: &Config,
|
||||||
project_id: i64,
|
project_id: i64,
|
||||||
gitlab_project_id: i64,
|
gitlab_project_id: i64,
|
||||||
|
signal: &ShutdownSignal,
|
||||||
) -> Result<IngestIssuesResult> {
|
) -> Result<IngestIssuesResult> {
|
||||||
let mut result = IngestIssuesResult::default();
|
let mut result = IngestIssuesResult::default();
|
||||||
|
|
||||||
@@ -58,6 +60,10 @@ pub async fn ingest_issues(
|
|||||||
let mut last_gitlab_id: Option<i64> = None;
|
let mut last_gitlab_id: Option<i64> = None;
|
||||||
|
|
||||||
while let Some(issue_result) = issues_stream.next().await {
|
while let Some(issue_result) = issues_stream.next().await {
|
||||||
|
if signal.is_cancelled() {
|
||||||
|
info!("Issue ingestion interrupted by shutdown signal");
|
||||||
|
break;
|
||||||
|
}
|
||||||
let issue = issue_result?;
|
let issue = issue_result?;
|
||||||
result.fetched += 1;
|
result.fetched += 1;
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use tracing::{debug, info, warn};
|
|||||||
use crate::Config;
|
use crate::Config;
|
||||||
use crate::core::error::{LoreError, Result};
|
use crate::core::error::{LoreError, Result};
|
||||||
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
use crate::core::payloads::{StorePayloadOptions, store_payload};
|
||||||
|
use crate::core::shutdown::ShutdownSignal;
|
||||||
use crate::core::time::now_ms;
|
use crate::core::time::now_ms;
|
||||||
use crate::documents::SourceType;
|
use crate::documents::SourceType;
|
||||||
use crate::gitlab::GitLabClient;
|
use crate::gitlab::GitLabClient;
|
||||||
@@ -42,6 +43,7 @@ pub async fn ingest_merge_requests(
|
|||||||
project_id: i64,
|
project_id: i64,
|
||||||
gitlab_project_id: i64,
|
gitlab_project_id: i64,
|
||||||
full_sync: bool,
|
full_sync: bool,
|
||||||
|
signal: &ShutdownSignal,
|
||||||
) -> Result<IngestMergeRequestsResult> {
|
) -> Result<IngestMergeRequestsResult> {
|
||||||
let mut result = IngestMergeRequestsResult::default();
|
let mut result = IngestMergeRequestsResult::default();
|
||||||
|
|
||||||
@@ -58,6 +60,10 @@ pub async fn ingest_merge_requests(
|
|||||||
let per_page = 100u32;
|
let per_page = 100u32;
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
|
if signal.is_cancelled() {
|
||||||
|
info!("MR ingestion interrupted by shutdown signal");
|
||||||
|
break;
|
||||||
|
}
|
||||||
let page_result = client
|
let page_result = client
|
||||||
.fetch_merge_requests_page(
|
.fetch_merge_requests_page(
|
||||||
gitlab_project_id,
|
gitlab_project_id,
|
||||||
|
|||||||
@@ -119,7 +119,8 @@ pub async fn ingest_project_issues_with_progress(
|
|||||||
};
|
};
|
||||||
|
|
||||||
emit(ProgressEvent::IssuesFetchStarted);
|
emit(ProgressEvent::IssuesFetchStarted);
|
||||||
let issue_result = ingest_issues(conn, client, config, project_id, gitlab_project_id).await?;
|
let issue_result =
|
||||||
|
ingest_issues(conn, client, config, project_id, gitlab_project_id, signal).await?;
|
||||||
|
|
||||||
result.issues_fetched = issue_result.fetched;
|
result.issues_fetched = issue_result.fetched;
|
||||||
result.issues_upserted = issue_result.upserted;
|
result.issues_upserted = issue_result.upserted;
|
||||||
@@ -329,6 +330,7 @@ pub async fn ingest_project_merge_requests_with_progress(
|
|||||||
project_id,
|
project_id,
|
||||||
gitlab_project_id,
|
gitlab_project_id,
|
||||||
full_sync,
|
full_sync,
|
||||||
|
signal,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
|||||||
42
src/main.rs
42
src/main.rs
@@ -845,7 +845,12 @@ fn print_combined_ingest_json(
|
|||||||
meta: RobotMeta { elapsed_ms },
|
meta: RobotMeta { elapsed_ms },
|
||||||
};
|
};
|
||||||
|
|
||||||
println!("{}", serde_json::to_string(&output).unwrap());
|
println!(
|
||||||
|
"{}",
|
||||||
|
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||||
|
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
@@ -874,7 +879,12 @@ fn print_combined_dry_run_json(
|
|||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
println!("{}", serde_json::to_string(&output).unwrap());
|
println!(
|
||||||
|
"{}",
|
||||||
|
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||||
|
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_count(
|
async fn handle_count(
|
||||||
@@ -966,7 +976,12 @@ fn print_init_json(result: &InitResult) {
|
|||||||
.collect(),
|
.collect(),
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
println!("{}", serde_json::to_string(&output).unwrap());
|
println!(
|
||||||
|
"{}",
|
||||||
|
serde_json::to_string(&output).unwrap_or_else(|e| {
|
||||||
|
format!(r#"{{"ok":false,"error":{{"code":"INTERNAL_ERROR","message":"JSON serialization failed: {e}"}}}}"#)
|
||||||
|
})
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_init(
|
async fn handle_init(
|
||||||
@@ -1198,17 +1213,20 @@ async fn handle_auth_test(
|
|||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
if robot_mode {
|
if robot_mode {
|
||||||
let output = FallbackErrorOutput {
|
let output = RobotErrorOutput::from(&e);
|
||||||
error: FallbackError {
|
eprintln!(
|
||||||
code: "AUTH_FAILED".to_string(),
|
"{}",
|
||||||
message: e.to_string(),
|
serde_json::to_string(&output).unwrap_or_else(|_| {
|
||||||
},
|
format!(r#"{{"error":{{"code":"{}","message":"{}"}}}}"#, e.code(), e)
|
||||||
};
|
})
|
||||||
eprintln!("{}", serde_json::to_string(&output)?);
|
);
|
||||||
} else {
|
} else {
|
||||||
eprintln!("{}", style(format!("Error: {e}")).red());
|
eprintln!("{} {}", style("Error:").red(), e);
|
||||||
|
if let Some(suggestion) = e.suggestion() {
|
||||||
|
eprintln!("{} {}", style("Hint:").yellow(), suggestion);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
std::process::exit(5);
|
std::process::exit(e.exit_code());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -98,15 +98,22 @@ pub fn apply_filters(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !filters.labels.is_empty() {
|
if !filters.labels.is_empty() {
|
||||||
let placeholders: Vec<String> = (0..filters.labels.len())
|
// SQLite has a default limit of 999 bind parameters.
|
||||||
|
let max_labels = 900_usize.saturating_sub(param_idx);
|
||||||
|
let label_slice = if filters.labels.len() > max_labels {
|
||||||
|
&filters.labels[..max_labels]
|
||||||
|
} else {
|
||||||
|
&filters.labels
|
||||||
|
};
|
||||||
|
let placeholders: Vec<String> = (0..label_slice.len())
|
||||||
.map(|i| format!("?{}", param_idx + i))
|
.map(|i| format!("?{}", param_idx + i))
|
||||||
.collect();
|
.collect();
|
||||||
sql.push_str(&format!(
|
sql.push_str(&format!(
|
||||||
" AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name IN ({}) GROUP BY dl.document_id HAVING COUNT(DISTINCT dl.label_name) = {})",
|
" AND EXISTS (SELECT 1 FROM document_labels dl WHERE dl.document_id = d.id AND dl.label_name IN ({}) GROUP BY dl.document_id HAVING COUNT(DISTINCT dl.label_name) = {})",
|
||||||
placeholders.join(","),
|
placeholders.join(","),
|
||||||
filters.labels.len()
|
label_slice.len()
|
||||||
));
|
));
|
||||||
for label in &filters.labels {
|
for label in label_slice {
|
||||||
params.push(Box::new(label.clone()));
|
params.push(Box::new(label.clone()));
|
||||||
param_idx += 1;
|
param_idx += 1;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,8 +50,8 @@ pub fn search_vector(
|
|||||||
.flat_map(|f| f.to_le_bytes())
|
.flat_map(|f| f.to_le_bytes())
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let max_chunks = max_chunks_per_document(conn);
|
let max_chunks = max_chunks_per_document(conn).max(1);
|
||||||
let multiplier = ((max_chunks as usize * 3 / 2) + 1).max(8);
|
let multiplier = ((max_chunks.unsigned_abs() as usize * 3 / 2) + 1).max(8);
|
||||||
let k = limit * multiplier;
|
let k = limit * multiplier;
|
||||||
|
|
||||||
let mut stmt = conn.prepare(
|
let mut stmt = conn.prepare(
|
||||||
|
|||||||
Reference in New Issue
Block a user