diff --git a/plans/asupersync-migration.md b/plans/asupersync-migration.md new file mode 100644 index 0000000..3a791f8 --- /dev/null +++ b/plans/asupersync-migration.md @@ -0,0 +1,696 @@ +# Plan: Replace Tokio + Reqwest with Asupersync + +**Date:** 2026-03-06 +**Status:** Draft +**Decisions:** Adapter layer (yes), timeouts in adapter, deep Cx threading, reference doc only + +--- + +## Context + +Gitlore uses tokio as its async runtime and reqwest as its HTTP client. Both work, but: + +- Ctrl+C during `join_all` silently drops in-flight HTTP requests with no cleanup +- `ShutdownSignal` is a hand-rolled `AtomicBool` with no structured cancellation +- No deterministic testing for concurrent ingestion patterns +- tokio provides no structured concurrency guarantees + +Asupersync is a cancel-correct async runtime with region-owned tasks, obligation tracking, and deterministic lab testing. Replacing tokio+reqwest gives us structured shutdown, cancel-correct ingestion, and testable concurrency. + +**Trade-offs accepted:** +- Nightly Rust required (asupersync dependency) +- Pre-1.0 runtime dependency (mitigated by adapter layer + version pinning) +- Deeper function signature changes for Cx threading + +--- + +## Current Tokio Usage Inventory + +### Production code (must migrate) + +| Location | API | Purpose | +|----------|-----|---------| +| `main.rs:53` | `#[tokio::main]` | Runtime entrypoint | +| `main.rs` (4 sites) | `tokio::spawn` + `tokio::signal::ctrl_c` | Ctrl+C signal handlers | +| `gitlab/client.rs:9` | `tokio::sync::Mutex` | Rate limiter lock | +| `gitlab/client.rs:10` | `tokio::time::sleep` | Rate limiter backoff | +| `gitlab/client.rs:729,736` | `tokio::join!` | Parallel pagination | + +### Production code (reqwest -- must replace) + +| Location | Usage | +|----------|-------| +| `gitlab/client.rs` | REST API: GET with headers/query, response status/headers/JSON, pagination via x-next-page and Link headers, retry on 429 | +| `gitlab/graphql.rs` | GraphQL: POST with Bearer auth + JSON body, response JSON parsing | +| `embedding/ollama.rs` | Ollama: GET health check, POST JSON embedding requests | + +### Test code (keep on tokio via dev-dep) + +| File | Tests | Uses wiremock? | +|------|-------|----------------| +| `gitlab/graphql_tests.rs` | 30 | Yes | +| `gitlab/client_tests.rs` | 4 | Yes | +| `embedding/pipeline_tests.rs` | 4 | Yes | +| `ingestion/surgical_tests.rs` | 4 async | Yes | + +### Test code (switch to asupersync) + +| File | Tests | Why safe | +|------|-------|----------| +| `core/timeline_seed_tests.rs` | 13 | Pure CPU/SQLite, no HTTP, no tokio APIs | + +### Test code (already sync `#[test]` -- no changes) + +~35 test files across documents/, core/, embedding/, gitlab/transformers/, ingestion/, cli/commands/, tests/ + +--- + +## Phase 0: Preparation (no runtime change) + +Goal: Reduce tokio surface area before the swap. Each step is independently valuable. + +### 0a. Extract signal handler + +The 4 identical Ctrl+C handlers in `main.rs` (lines 1020, 2341, 2493, 2524) become one function in `core/shutdown.rs`: + +```rust +pub fn install_ctrl_c_handler(signal: ShutdownSignal) { + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)"); + signal.cancel(); + let _ = tokio::signal::ctrl_c().await; + std::process::exit(130); + }); +} +``` + +4 spawn sites -> 1 function. The function body changes in Phase 3. + +### 0b. Replace tokio::sync::Mutex with std::sync::Mutex + +In `gitlab/client.rs`, the rate limiter lock guards a tiny sync critical section (check `Instant::now()`, compute delay). No async work inside the lock. `std::sync::Mutex` is correct and removes a tokio dependency: + +```rust +// Before +use tokio::sync::Mutex; +let delay = self.rate_limiter.lock().await.check_delay(); + +// After +use std::sync::Mutex; +let delay = self.rate_limiter.lock().expect("rate limiter poisoned").check_delay(); +``` + +Note: `.expect()` over `.unwrap()` for clarity. Poisoning is near-impossible here (the critical section is a trivial `Instant::now()` check), but the explicit message aids debugging if it ever fires. + +### 0c. Replace tokio::join! with futures::join! + +In `gitlab/client.rs:729,736`. `futures::join!` is runtime-agnostic and already in deps. + +**After Phase 0, remaining tokio in production code:** +- `#[tokio::main]` (1 site) +- `tokio::spawn` + `tokio::signal::ctrl_c` (1 function) +- `tokio::time::sleep` (1 import) + +--- + +## Phase 0d: Error Type Migration (must precede adapter layer) + +The adapter layer (Phase 1) uses `GitLabNetworkError { detail: Option }`, which requires the error type change from Phase 4. Move this change up front so Phases 1-3 compile as a unit. + +### `src/core/error.rs` + +```rust +// Remove: +#[error("HTTP error: {0}")] +Http(#[from] reqwest::Error), + +// Change: +#[error("Cannot connect to GitLab at {base_url}")] +GitLabNetworkError { + base_url: String, + // Before: source: Option + // After: + detail: Option, +}, +``` + +The adapter layer stringifies HTTP client errors at the boundary so `LoreError` doesn't depend on any HTTP client's error types. This also means the existing reqwest call sites that construct `GitLabNetworkError` must be updated to pass `detail: Some(format!("{e:?}"))` instead of `source: Some(e)` -- but those sites are rewritten in Phase 2 anyway, so no extra work. + +--- + +## Phase 1: Build the HTTP Adapter Layer + +### Why + +Asupersync's `HttpClient` is lower-level than reqwest: +- Headers: `Vec<(String, String)>` not typed `HeaderMap`/`HeaderValue` +- Body: `Vec` not a builder with `.json()` +- Status: raw `u16` not `StatusCode` enum +- Response: body already buffered, no async `.json().await` +- No per-request timeout + +Without an adapter, every call site becomes 5-6 lines of boilerplate. The adapter also isolates gitlore from asupersync's pre-1.0 HTTP API. + +### New file: `src/http.rs` (~100 LOC) + +```rust +use asupersync::http::h1::{HttpClient, HttpClientConfig, PoolConfig}; +use asupersync::http::h1::types::Method; +use asupersync::time::timeout; +use serde::de::DeserializeOwned; +use serde::Serialize; +use std::time::Duration; + +use crate::core::error::{LoreError, Result}; + +pub struct Client { + inner: HttpClient, + timeout: Duration, +} + +pub struct Response { + pub status: u16, + pub reason: String, + pub headers: Vec<(String, String)>, + body: Vec, +} + +impl Client { + pub fn with_timeout(timeout: Duration) -> Self { + Self { + inner: HttpClient::with_config(HttpClientConfig { + pool_config: PoolConfig::builder() + .max_connections_per_host(6) + .max_total_connections(100) + .idle_timeout(Duration::from_secs(90)) + .build(), + ..Default::default() + }), + timeout, + } + } + + pub async fn get(&self, url: &str, headers: &[(&str, &str)]) -> Result { + self.execute(Method::Get, url, headers, Vec::new()).await + } + + pub async fn get_with_query( + &self, + url: &str, + params: &[(&str, String)], + headers: &[(&str, &str)], + ) -> Result { + let full_url = append_query_params(url, params); + self.execute(Method::Get, &full_url, headers, Vec::new()).await + } + + pub async fn post_json( + &self, + url: &str, + headers: &[(&str, &str)], + body: &T, + ) -> Result { + let body_bytes = serde_json::to_vec(body) + .map_err(|e| LoreError::Other(format!("JSON serialization failed: {e}")))?; + let mut all_headers = headers.to_vec(); + all_headers.push(("Content-Type", "application/json")); + self.execute(Method::Post, url, &all_headers, body_bytes).await + } + + async fn execute( + &self, + method: Method, + url: &str, + headers: &[(&str, &str)], + body: Vec, + ) -> Result { + let header_tuples: Vec<(String, String)> = headers + .iter() + .map(|(k, v)| ((*k).to_owned(), (*v).to_owned())) + .collect(); + + let raw = timeout(self.timeout, self.inner.request(method, url, header_tuples, body)) + .await + .map_err(|_| LoreError::Other(format!("Request timed out after {:?}", self.timeout)))? + .map_err(|e| LoreError::GitLabNetworkError { + base_url: url.to_string(), + detail: Some(format!("{e:?}")), + })?; + + Ok(Response { + status: raw.status, + reason: raw.reason, + headers: raw.headers, + body: raw.body, + }) + } +} + +impl Response { + pub fn is_success(&self) -> bool { + (200..300).contains(&self.status) + } + + pub fn json(&self) -> Result { + serde_json::from_slice(&self.body) + .map_err(|e| LoreError::Other(format!("JSON parse error: {e}"))) + } + + pub fn text(self) -> Result { + String::from_utf8(self.body) + .map_err(|e| LoreError::Other(format!("UTF-8 decode error: {e}"))) + } + + pub fn header(&self, name: &str) -> Option<&str> { + self.headers + .iter() + .find(|(k, _)| k.eq_ignore_ascii_case(name)) + .map(|(_, v)| v.as_str()) + } + + /// Returns all values for a header name (case-insensitive). + /// Needed for multi-value headers like `Link` used in pagination. + pub fn headers_all(&self, name: &str) -> Vec<&str> { + self.headers + .iter() + .filter(|(k, _)| k.eq_ignore_ascii_case(name)) + .map(|(_, v)| v.as_str()) + .collect() + } +} + +fn append_query_params(url: &str, params: &[(&str, String)]) -> String { + if params.is_empty() { + return url.to_string(); + } + let query: String = params + .iter() + .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v))) + .collect::>() + .join("&"); + if url.contains('?') { + format!("{url}&{query}") + } else { + format!("{url}?{query}") + } +} +``` + +### Timeout behavior + +Every request is wrapped with `asupersync::time::timeout(self.timeout, ...)`. Default timeouts: +- GitLab REST/GraphQL: 30s +- Ollama: configurable (default 60s) +- Ollama health check: 5s + +--- + +## Phase 2: Migrate the 3 HTTP Modules + +### 2a. `gitlab/client.rs` (REST API) + +**Imports:** +```rust +// Remove +use reqwest::header::{ACCEPT, HeaderMap, HeaderValue}; +use reqwest::{Client, Response, StatusCode}; + +// Add +use crate::http::{Client, Response}; +``` + +**Client construction** (lines 68-96): +```rust +// Before: reqwest::Client::builder().default_headers(h).timeout(d).build() +// After: +let client = Client::with_timeout(Duration::from_secs(30)); +``` + +**request() method** (lines 129-170): +```rust +// Before +let response = self.client.get(&url) + .header("PRIVATE-TOKEN", &self.token) + .send().await + .map_err(|e| LoreError::GitLabNetworkError { ... })?; + +// After +let response = self.client.get(&url, &[ + ("PRIVATE-TOKEN", &self.token), + ("Accept", "application/json"), +]).await?; +``` + +**request_with_headers() method** (lines 510-559): +```rust +// Before +let response = self.client.get(&url) + .query(params) + .header("PRIVATE-TOKEN", &self.token) + .send().await?; +let headers = response.headers().clone(); + +// After +let response = self.client.get_with_query(&url, params, &[ + ("PRIVATE-TOKEN", &self.token), + ("Accept", "application/json"), +]).await?; +// headers already owned in response.headers +``` + +**handle_response()** (lines 182-219): +```rust +// Before: async fn (consumed body with .text().await) +// After: sync fn (body already buffered in Response) +fn handle_response(&self, response: Response, path: &str) -> Result { + match response.status { + 401 => Err(LoreError::GitLabAuthFailed), + 404 => Err(LoreError::GitLabNotFound { resource: path.into() }), + 429 => { + let retry_after = response.header("retry-after") + .and_then(|v| v.parse().ok()) + .unwrap_or(60); + Err(LoreError::GitLabRateLimited { retry_after }) + } + s if (200..300).contains(&s) => response.json::(), + s => Err(LoreError::Other(format!("GitLab API error: {} {}", s, response.reason))), + } +} +``` + +**Pagination** -- No structural changes. `async_stream::stream!` and header parsing stay the same. Only the response type changes: +```rust +// Before: headers.get("x-next-page").and_then(|v| v.to_str().ok()) +// After: response.header("x-next-page") +``` + +**parse_link_header_next** -- Change signature from `(headers: &HeaderMap)` to `(headers: &[(String, String)])` and find by case-insensitive name. + +### 2b. `gitlab/graphql.rs` + +```rust +// Before +let response = self.http.post(&url) + .header("Authorization", format!("Bearer {}", self.token)) + .header("Content-Type", "application/json") + .json(&body).send().await?; +let json: Value = response.json().await?; + +// After +let bearer = format!("Bearer {}", self.token); +let response = self.http.post_json(&url, &[ + ("Authorization", &bearer), +], &body).await?; +let json: Value = response.json()?; +``` + +Status matching changes from `response.status().as_u16()` to `response.status` (already u16). + +### 2c. `embedding/ollama.rs` + +```rust +// Health check +let response = self.client.get(&url, &[]).await?; +let tags: TagsResponse = response.json()?; + +// Embed batch +let response = self.client.post_json(&url, &[], &request).await?; +if !response.is_success() { + let status = response.status; // capture before .text() consumes response + let body = response.text()?; + return Err(LoreError::EmbeddingFailed { document_id: 0, reason: format!("HTTP {status}: {body}") }); +} +let embed_response: EmbedResponse = response.json()?; +``` + +**Standalone health check** (`check_ollama_health`): Currently creates a temporary `reqwest::Client`. Replace with temporary `crate::http::Client`: +```rust +pub async fn check_ollama_health(base_url: &str) -> bool { + let client = Client::with_timeout(Duration::from_secs(5)); + let url = format!("{base_url}/api/tags"); + client.get(&url, &[]).await.map_or(false, |r| r.is_success()) +} +``` + +--- + +## Phase 3: Swap the Runtime + Deep Cx Threading + +### 3a. Cargo.toml + +```toml +[dependencies] +# Remove: +# reqwest = { version = "0.12", features = ["json"] } +# tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "signal"] } + +# Add: +asupersync = { version = "0.2", features = ["tls", "tls-native-roots"] } + +# Keep unchanged: +async-stream = "0.3" +futures = { version = "0.3", default-features = false, features = ["alloc"] } +serde = { version = "1", features = ["derive"] } +serde_json = "1" +urlencoding = "2" + +[dev-dependencies] +tempfile = "3" +wiremock = "0.6" +tokio = { version = "1", features = ["rt", "macros"] } +``` + +### 3b. rust-toolchain.toml + +```toml +[toolchain] +channel = "nightly-2026-03-01" # Pin specific date to avoid surprise breakage +``` + +Update the date as needed when newer nightlies are verified. Never use bare `"nightly"` in production. + +### 3c. Entrypoint (`main.rs:53`) + +```rust +// Before +#[tokio::main] +async fn main() -> Result<()> { ... } + +// After +#[asupersync::main] +async fn main(cx: &Cx) -> Outcome<()> { ... } +``` + +### 3d. Signal handler (`core/shutdown.rs`) + +```rust +// After (Phase 0 extracted it; now rewrite for asupersync) +pub async fn install_ctrl_c_handler(cx: &Cx, signal: ShutdownSignal) { + cx.spawn("ctrl-c-handler", async move |cx| { + cx.shutdown_signal().await; + eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)"); + signal.cancel(); + // Preserve hard-exit on second Ctrl+C (same behavior as Phase 0a) + cx.shutdown_signal().await; + std::process::exit(130); + }); +} +``` + +### 3e. Rate limiter sleep + +```rust +// Before +use tokio::time::sleep; + +// After +use asupersync::time::sleep; +``` + +### 3f. Deep Cx threading + +Thread `Cx` from `main()` through command dispatch into the orchestrator and ingestion modules. This enables region-scoped cancellation for `join_all` batches. + +**Function signatures that need `cx: &Cx` added:** + +| Module | Functions | +|--------|-----------| +| `main.rs` | Command dispatch match arms for `sync`, `ingest`, `embed` | +| `cli/commands/sync.rs` | `run_sync()` | +| `cli/commands/ingest.rs` | `run_ingest_command()`, `run_ingest()` | +| `cli/commands/embed.rs` | `run_embed()` | +| `cli/commands/sync_surgical.rs` | `run_sync_surgical()` | +| `ingestion/orchestrator.rs` | `ingest_issues()`, `ingest_merge_requests()`, `ingest_discussions()`, etc. | +| `ingestion/surgical.rs` | `surgical_sync()` | +| `embedding/pipeline.rs` | `embed_documents()`, `embed_batch_group()` | + +**Region wrapping for join_all batches** (orchestrator.rs): +```rust +// Before +let prefetched_batch = join_all(prefetch_futures).await; + +// After -- cancel-correct region with result collection +let (tx, rx) = std::sync::mpsc::channel(); +cx.region(|scope| async { + for future in prefetch_futures { + let tx = tx.clone(); + scope.spawn(async move |_cx| { + let result = future.await; + let _ = tx.send(result); + }); + } + drop(tx); +}).await; +let prefetched_batch: Vec<_> = rx.into_iter().collect(); +``` + +Note: The exact result-collection pattern depends on asupersync's region API. If `scope.spawn()` returns a `JoinHandle`, prefer collecting handles and awaiting them. The channel pattern above works regardless of API shape. + +This is the biggest payoff: if Ctrl+C fires during a prefetch batch, the region cancels all in-flight HTTP requests with bounded cleanup instead of silently dropping them. + +**Estimated signature changes:** ~15 functions gain a `cx: &Cx` parameter. + +--- + +## Phase 4: (Moved to Phase 0d) + +Error type migration was moved to Phase 0d to resolve a compile-order dependency: the adapter layer (Phase 1) uses the new `GitLabNetworkError { detail }` shape. + +--- + +## Phase 5: Test Migration + +### Keep on `#[tokio::test]` (wiremock tests -- 42 tests) + +No changes. `tokio` is in `[dev-dependencies]` with `features = ["rt", "macros"]`. + +| File | Tests | +|------|-------| +| `gitlab/graphql_tests.rs` | 30 | +| `gitlab/client_tests.rs` | 4 | +| `embedding/pipeline_tests.rs` | 4 | +| `ingestion/surgical_tests.rs` | 4 | + +### Switch to `#[asupersync::test]` (no wiremock -- 13 tests) + +| File | Tests | +|------|-------| +| `core/timeline_seed_tests.rs` | 13 | + +### Already `#[test]` (sync -- ~35 files) + +No changes needed. + +--- + +## Phase 6: Verify and Harden + +### Verification checklist + +```bash +cargo check --all-targets +cargo clippy --all-targets -- -D warnings +cargo fmt --check +cargo test +``` + +### Specific things to verify + +1. **async-stream on nightly** -- Does `async_stream 0.3` compile on current nightly? +2. **TLS root certs on macOS** -- Does `tls-native-roots` pick up system CA certs? +3. **Connection pool under concurrency** -- Do `join_all` batches (4-8 concurrent requests to same host) work without pool deadlock? +4. **Pagination streams** -- Do `async_stream::stream!` pagination generators work unchanged? +5. **Wiremock test isolation** -- Do wiremock tests pass with tokio only in dev-deps? + +### Reqwest behavioral differences to audit + +reqwest provides several implicit behaviors that asupersync's h1 client may not. Verify each: + +| reqwest default | gitlore relies on it? | asupersync equivalent | +|-----------------|----------------------|----------------------| +| Automatic redirect following (up to 10) | Unlikely (GitLab API doesn't redirect) | Verify: if 3xx is returned, does gitlore handle it? | +| Automatic gzip/deflate decompression | No (JSON responses are small) | Not needed | +| Proxy from `HTTP_PROXY`/`HTTPS_PROXY` env | Possibly (corporate environments) | Must verify asupersync proxy support | +| Connection keep-alive | Yes (pagination batches) | Covered by PoolConfig | +| System DNS resolution | Yes | Should be same (OS-level) | + +### Cancellation + DB transaction alignment + +Region-based cancellation stops HTTP tasks cleanly, but partial ingestion can leave the database in an inconsistent state if cancellation fires between "fetched data" and "wrote to DB". Verify: + +- All DB writes in ingestion batches use `unchecked_transaction()` (already the case for most ingestion paths) +- Transaction boundaries align with region scope: a cancelled region should not leave partial batch data committed +- The existing `ShutdownSignal` check-before-write pattern in orchestrator loops remains functional alongside region cancellation + +--- + +## File Change Summary + +| File | Change | LOC | +|------|--------|-----| +| `Cargo.toml` | Swap deps | ~10 | +| `rust-toolchain.toml` | NEW -- set nightly | 3 | +| `src/http.rs` | NEW -- adapter layer | ~100 | +| `src/main.rs` | Entrypoint macro, Cx threading, remove 4 signal handlers | ~40 | +| `src/core/shutdown.rs` | Extract + rewrite signal handler | ~20 | +| `src/core/error.rs` | Remove reqwest::Error, change GitLabNetworkError (Phase 0d) | ~10 | +| `src/gitlab/client.rs` | Replace reqwest, remove tokio imports, adapt all methods | ~80 | +| `src/gitlab/graphql.rs` | Replace reqwest | ~20 | +| `src/embedding/ollama.rs` | Replace reqwest | ~20 | +| `src/cli/commands/sync.rs` | Add Cx param | ~5 | +| `src/cli/commands/ingest.rs` | Add Cx param | ~5 | +| `src/cli/commands/embed.rs` | Add Cx param | ~5 | +| `src/cli/commands/sync_surgical.rs` | Add Cx param | ~5 | +| `src/ingestion/orchestrator.rs` | Add Cx param, region-wrap join_all | ~30 | +| `src/ingestion/surgical.rs` | Add Cx param | ~10 | +| `src/embedding/pipeline.rs` | Add Cx param | ~10 | +| `src/core/timeline_seed_tests.rs` | Swap test macro | ~13 | + +**Total: ~16 files modified, 1 new file, ~400-500 LOC changed.** + +--- + +## Execution Order + +``` +Phase 0a-0c (prep, safe, independent) + | + v +Phase 0d (error type migration -- required before adapter compiles) + | + v +Phase 1 (adapter layer, compiles but unused) ----+ + | | + v | These 3 are one +Phase 2 (migrate 3 HTTP modules to adapter) ------+ atomic commit + | | + v | +Phase 3 (swap runtime, Cx threading) ------------+ + | + v +Phase 5 (test migration) + | + v +Phase 6 (verify + harden) +``` + +Phase 0a-0c can be committed independently (good cleanup regardless). +Phase 0d (error types) can also land independently, but MUST precede the adapter layer. +Phases 1-3 must land together (removing reqwest requires both the adapter AND the new runtime). +Phases 5-6 are cleanup that can be incremental. + +--- + +## Risks + +| Risk | Severity | Mitigation | +|------|----------|------------| +| asupersync pre-1.0 API changes | High | Adapter layer isolates call sites. Pin exact version. | +| Nightly Rust breakage | Medium | Pin nightly date in rust-toolchain.toml. CI tests on nightly. | +| TLS cert issues on macOS | Medium | Test early in Phase 6. Fallback: `tls-webpki-roots` (Mozilla bundle). | +| Connection pool behavior under load | Medium | Stress test with `join_all` of 8+ concurrent requests in Phase 6. | +| async-stream nightly compat | Low | Widely used crate, likely fine. Fallback: manual Stream impl. | +| Build time increase | Low | Measure before/after. asupersync may be heavier than tokio. | +| Reqwest behavioral drift | Medium | reqwest has implicit redirect/proxy/compression handling. Audit each (see Phase 6 table). GitLab API doesn't redirect, so low actual risk. | +| Partial ingestion on cancel | Medium | Region cancellation can fire between HTTP fetch and DB write. Verify transaction boundaries align with region scope (see Phase 6). |