# Plan: Replace Tokio + Reqwest with Asupersync **Date:** 2026-03-06 **Status:** Draft **Decisions:** Adapter layer (yes), timeouts in adapter, deep Cx threading, reference doc only --- ## Context Gitlore uses tokio as its async runtime and reqwest as its HTTP client. Both work, but: - Ctrl+C during `join_all` silently drops in-flight HTTP requests with no cleanup - `ShutdownSignal` is a hand-rolled `AtomicBool` with no structured cancellation - No deterministic testing for concurrent ingestion patterns - tokio provides no structured concurrency guarantees Asupersync is a cancel-correct async runtime with region-owned tasks, obligation tracking, and deterministic lab testing. Replacing tokio+reqwest gives us structured shutdown, cancel-correct ingestion, and testable concurrency. **Trade-offs accepted:** - Nightly Rust required (asupersync dependency) - Pre-1.0 runtime dependency (mitigated by adapter layer + version pinning) - Deeper function signature changes for Cx threading --- ## Current Tokio Usage Inventory ### Production code (must migrate) | Location | API | Purpose | |----------|-----|---------| | `main.rs:53` | `#[tokio::main]` | Runtime entrypoint | | `main.rs` (4 sites) | `tokio::spawn` + `tokio::signal::ctrl_c` | Ctrl+C signal handlers | | `gitlab/client.rs:9` | `tokio::sync::Mutex` | Rate limiter lock | | `gitlab/client.rs:10` | `tokio::time::sleep` | Rate limiter backoff | | `gitlab/client.rs:729,736` | `tokio::join!` | Parallel pagination | ### Production code (reqwest -- must replace) | Location | Usage | |----------|-------| | `gitlab/client.rs` | REST API: GET with headers/query, response status/headers/JSON, pagination via x-next-page and Link headers, retry on 429 | | `gitlab/graphql.rs` | GraphQL: POST with Bearer auth + JSON body, response JSON parsing | | `embedding/ollama.rs` | Ollama: GET health check, POST JSON embedding requests | ### Test code (keep on tokio via dev-dep) | File | Tests | Uses wiremock? | |------|-------|----------------| | `gitlab/graphql_tests.rs` | 30 | Yes | | `gitlab/client_tests.rs` | 4 | Yes | | `embedding/pipeline_tests.rs` | 4 | Yes | | `ingestion/surgical_tests.rs` | 4 async | Yes | ### Test code (switch to asupersync) | File | Tests | Why safe | |------|-------|----------| | `core/timeline_seed_tests.rs` | 13 | Pure CPU/SQLite, no HTTP, no tokio APIs | ### Test code (already sync `#[test]` -- no changes) ~35 test files across documents/, core/, embedding/, gitlab/transformers/, ingestion/, cli/commands/, tests/ --- ## Phase 0: Preparation (no runtime change) Goal: Reduce tokio surface area before the swap. Each step is independently valuable. ### 0a. Extract signal handler The 4 identical Ctrl+C handlers in `main.rs` (lines 1020, 2341, 2493, 2524) become one function in `core/shutdown.rs`: ```rust pub fn install_ctrl_c_handler(signal: ShutdownSignal) { tokio::spawn(async move { let _ = tokio::signal::ctrl_c().await; eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)"); signal.cancel(); let _ = tokio::signal::ctrl_c().await; std::process::exit(130); }); } ``` 4 spawn sites -> 1 function. The function body changes in Phase 3. ### 0b. Replace tokio::sync::Mutex with std::sync::Mutex In `gitlab/client.rs`, the rate limiter lock guards a tiny sync critical section (check `Instant::now()`, compute delay). No async work inside the lock. `std::sync::Mutex` is correct and removes a tokio dependency: ```rust // Before use tokio::sync::Mutex; let delay = self.rate_limiter.lock().await.check_delay(); // After use std::sync::Mutex; let delay = self.rate_limiter.lock().expect("rate limiter poisoned").check_delay(); ``` Note: `.expect()` over `.unwrap()` for clarity. Poisoning is near-impossible here (the critical section is a trivial `Instant::now()` check), but the explicit message aids debugging if it ever fires. ### 0c. Replace tokio::join! with futures::join! In `gitlab/client.rs:729,736`. `futures::join!` is runtime-agnostic and already in deps. **After Phase 0, remaining tokio in production code:** - `#[tokio::main]` (1 site) - `tokio::spawn` + `tokio::signal::ctrl_c` (1 function) - `tokio::time::sleep` (1 import) --- ## Phase 0d: Error Type Migration (must precede adapter layer) The adapter layer (Phase 1) uses `GitLabNetworkError { detail: Option }`, which requires the error type change from Phase 4. Move this change up front so Phases 1-3 compile as a unit. ### `src/core/error.rs` ```rust // Remove: #[error("HTTP error: {0}")] Http(#[from] reqwest::Error), // Change: #[error("Cannot connect to GitLab at {base_url}")] GitLabNetworkError { base_url: String, // Before: source: Option // After: detail: Option, }, ``` The adapter layer stringifies HTTP client errors at the boundary so `LoreError` doesn't depend on any HTTP client's error types. This also means the existing reqwest call sites that construct `GitLabNetworkError` must be updated to pass `detail: Some(format!("{e:?}"))` instead of `source: Some(e)` -- but those sites are rewritten in Phase 2 anyway, so no extra work. --- ## Phase 1: Build the HTTP Adapter Layer ### Why Asupersync's `HttpClient` is lower-level than reqwest: - Headers: `Vec<(String, String)>` not typed `HeaderMap`/`HeaderValue` - Body: `Vec` not a builder with `.json()` - Status: raw `u16` not `StatusCode` enum - Response: body already buffered, no async `.json().await` - No per-request timeout Without an adapter, every call site becomes 5-6 lines of boilerplate. The adapter also isolates gitlore from asupersync's pre-1.0 HTTP API. ### New file: `src/http.rs` (~100 LOC) ```rust use asupersync::http::h1::{HttpClient, HttpClientConfig, PoolConfig}; use asupersync::http::h1::types::Method; use asupersync::time::timeout; use serde::de::DeserializeOwned; use serde::Serialize; use std::time::Duration; use crate::core::error::{LoreError, Result}; pub struct Client { inner: HttpClient, timeout: Duration, } pub struct Response { pub status: u16, pub reason: String, pub headers: Vec<(String, String)>, body: Vec, } impl Client { pub fn with_timeout(timeout: Duration) -> Self { Self { inner: HttpClient::with_config(HttpClientConfig { pool_config: PoolConfig::builder() .max_connections_per_host(6) .max_total_connections(100) .idle_timeout(Duration::from_secs(90)) .build(), ..Default::default() }), timeout, } } pub async fn get(&self, url: &str, headers: &[(&str, &str)]) -> Result { self.execute(Method::Get, url, headers, Vec::new()).await } pub async fn get_with_query( &self, url: &str, params: &[(&str, String)], headers: &[(&str, &str)], ) -> Result { let full_url = append_query_params(url, params); self.execute(Method::Get, &full_url, headers, Vec::new()).await } pub async fn post_json( &self, url: &str, headers: &[(&str, &str)], body: &T, ) -> Result { let body_bytes = serde_json::to_vec(body) .map_err(|e| LoreError::Other(format!("JSON serialization failed: {e}")))?; let mut all_headers = headers.to_vec(); all_headers.push(("Content-Type", "application/json")); self.execute(Method::Post, url, &all_headers, body_bytes).await } async fn execute( &self, method: Method, url: &str, headers: &[(&str, &str)], body: Vec, ) -> Result { let header_tuples: Vec<(String, String)> = headers .iter() .map(|(k, v)| ((*k).to_owned(), (*v).to_owned())) .collect(); let raw = timeout(self.timeout, self.inner.request(method, url, header_tuples, body)) .await .map_err(|_| LoreError::Other(format!("Request timed out after {:?}", self.timeout)))? .map_err(|e| LoreError::GitLabNetworkError { base_url: url.to_string(), detail: Some(format!("{e:?}")), })?; Ok(Response { status: raw.status, reason: raw.reason, headers: raw.headers, body: raw.body, }) } } impl Response { pub fn is_success(&self) -> bool { (200..300).contains(&self.status) } pub fn json(&self) -> Result { serde_json::from_slice(&self.body) .map_err(|e| LoreError::Other(format!("JSON parse error: {e}"))) } pub fn text(self) -> Result { String::from_utf8(self.body) .map_err(|e| LoreError::Other(format!("UTF-8 decode error: {e}"))) } pub fn header(&self, name: &str) -> Option<&str> { self.headers .iter() .find(|(k, _)| k.eq_ignore_ascii_case(name)) .map(|(_, v)| v.as_str()) } /// Returns all values for a header name (case-insensitive). /// Needed for multi-value headers like `Link` used in pagination. pub fn headers_all(&self, name: &str) -> Vec<&str> { self.headers .iter() .filter(|(k, _)| k.eq_ignore_ascii_case(name)) .map(|(_, v)| v.as_str()) .collect() } } fn append_query_params(url: &str, params: &[(&str, String)]) -> String { if params.is_empty() { return url.to_string(); } let query: String = params .iter() .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v))) .collect::>() .join("&"); if url.contains('?') { format!("{url}&{query}") } else { format!("{url}?{query}") } } ``` ### Timeout behavior Every request is wrapped with `asupersync::time::timeout(self.timeout, ...)`. Default timeouts: - GitLab REST/GraphQL: 30s - Ollama: configurable (default 60s) - Ollama health check: 5s --- ## Phase 2: Migrate the 3 HTTP Modules ### 2a. `gitlab/client.rs` (REST API) **Imports:** ```rust // Remove use reqwest::header::{ACCEPT, HeaderMap, HeaderValue}; use reqwest::{Client, Response, StatusCode}; // Add use crate::http::{Client, Response}; ``` **Client construction** (lines 68-96): ```rust // Before: reqwest::Client::builder().default_headers(h).timeout(d).build() // After: let client = Client::with_timeout(Duration::from_secs(30)); ``` **request() method** (lines 129-170): ```rust // Before let response = self.client.get(&url) .header("PRIVATE-TOKEN", &self.token) .send().await .map_err(|e| LoreError::GitLabNetworkError { ... })?; // After let response = self.client.get(&url, &[ ("PRIVATE-TOKEN", &self.token), ("Accept", "application/json"), ]).await?; ``` **request_with_headers() method** (lines 510-559): ```rust // Before let response = self.client.get(&url) .query(params) .header("PRIVATE-TOKEN", &self.token) .send().await?; let headers = response.headers().clone(); // After let response = self.client.get_with_query(&url, params, &[ ("PRIVATE-TOKEN", &self.token), ("Accept", "application/json"), ]).await?; // headers already owned in response.headers ``` **handle_response()** (lines 182-219): ```rust // Before: async fn (consumed body with .text().await) // After: sync fn (body already buffered in Response) fn handle_response(&self, response: Response, path: &str) -> Result { match response.status { 401 => Err(LoreError::GitLabAuthFailed), 404 => Err(LoreError::GitLabNotFound { resource: path.into() }), 429 => { let retry_after = response.header("retry-after") .and_then(|v| v.parse().ok()) .unwrap_or(60); Err(LoreError::GitLabRateLimited { retry_after }) } s if (200..300).contains(&s) => response.json::(), s => Err(LoreError::Other(format!("GitLab API error: {} {}", s, response.reason))), } } ``` **Pagination** -- No structural changes. `async_stream::stream!` and header parsing stay the same. Only the response type changes: ```rust // Before: headers.get("x-next-page").and_then(|v| v.to_str().ok()) // After: response.header("x-next-page") ``` **parse_link_header_next** -- Change signature from `(headers: &HeaderMap)` to `(headers: &[(String, String)])` and find by case-insensitive name. ### 2b. `gitlab/graphql.rs` ```rust // Before let response = self.http.post(&url) .header("Authorization", format!("Bearer {}", self.token)) .header("Content-Type", "application/json") .json(&body).send().await?; let json: Value = response.json().await?; // After let bearer = format!("Bearer {}", self.token); let response = self.http.post_json(&url, &[ ("Authorization", &bearer), ], &body).await?; let json: Value = response.json()?; ``` Status matching changes from `response.status().as_u16()` to `response.status` (already u16). ### 2c. `embedding/ollama.rs` ```rust // Health check let response = self.client.get(&url, &[]).await?; let tags: TagsResponse = response.json()?; // Embed batch let response = self.client.post_json(&url, &[], &request).await?; if !response.is_success() { let status = response.status; // capture before .text() consumes response let body = response.text()?; return Err(LoreError::EmbeddingFailed { document_id: 0, reason: format!("HTTP {status}: {body}") }); } let embed_response: EmbedResponse = response.json()?; ``` **Standalone health check** (`check_ollama_health`): Currently creates a temporary `reqwest::Client`. Replace with temporary `crate::http::Client`: ```rust pub async fn check_ollama_health(base_url: &str) -> bool { let client = Client::with_timeout(Duration::from_secs(5)); let url = format!("{base_url}/api/tags"); client.get(&url, &[]).await.map_or(false, |r| r.is_success()) } ``` --- ## Phase 3: Swap the Runtime + Deep Cx Threading ### 3a. Cargo.toml ```toml [dependencies] # Remove: # reqwest = { version = "0.12", features = ["json"] } # tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "signal"] } # Add: asupersync = { version = "0.2", features = ["tls", "tls-native-roots"] } # Keep unchanged: async-stream = "0.3" futures = { version = "0.3", default-features = false, features = ["alloc"] } serde = { version = "1", features = ["derive"] } serde_json = "1" urlencoding = "2" [dev-dependencies] tempfile = "3" wiremock = "0.6" tokio = { version = "1", features = ["rt", "macros"] } ``` ### 3b. rust-toolchain.toml ```toml [toolchain] channel = "nightly-2026-03-01" # Pin specific date to avoid surprise breakage ``` Update the date as needed when newer nightlies are verified. Never use bare `"nightly"` in production. ### 3c. Entrypoint (`main.rs:53`) ```rust // Before #[tokio::main] async fn main() -> Result<()> { ... } // After #[asupersync::main] async fn main(cx: &Cx) -> Outcome<()> { ... } ``` ### 3d. Signal handler (`core/shutdown.rs`) ```rust // After (Phase 0 extracted it; now rewrite for asupersync) pub async fn install_ctrl_c_handler(cx: &Cx, signal: ShutdownSignal) { cx.spawn("ctrl-c-handler", async move |cx| { cx.shutdown_signal().await; eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)"); signal.cancel(); // Preserve hard-exit on second Ctrl+C (same behavior as Phase 0a) cx.shutdown_signal().await; std::process::exit(130); }); } ``` ### 3e. Rate limiter sleep ```rust // Before use tokio::time::sleep; // After use asupersync::time::sleep; ``` ### 3f. Deep Cx threading Thread `Cx` from `main()` through command dispatch into the orchestrator and ingestion modules. This enables region-scoped cancellation for `join_all` batches. **Function signatures that need `cx: &Cx` added:** | Module | Functions | |--------|-----------| | `main.rs` | Command dispatch match arms for `sync`, `ingest`, `embed` | | `cli/commands/sync.rs` | `run_sync()` | | `cli/commands/ingest.rs` | `run_ingest_command()`, `run_ingest()` | | `cli/commands/embed.rs` | `run_embed()` | | `cli/commands/sync_surgical.rs` | `run_sync_surgical()` | | `ingestion/orchestrator.rs` | `ingest_issues()`, `ingest_merge_requests()`, `ingest_discussions()`, etc. | | `ingestion/surgical.rs` | `surgical_sync()` | | `embedding/pipeline.rs` | `embed_documents()`, `embed_batch_group()` | **Region wrapping for join_all batches** (orchestrator.rs): ```rust // Before let prefetched_batch = join_all(prefetch_futures).await; // After -- cancel-correct region with result collection let (tx, rx) = std::sync::mpsc::channel(); cx.region(|scope| async { for future in prefetch_futures { let tx = tx.clone(); scope.spawn(async move |_cx| { let result = future.await; let _ = tx.send(result); }); } drop(tx); }).await; let prefetched_batch: Vec<_> = rx.into_iter().collect(); ``` Note: The exact result-collection pattern depends on asupersync's region API. If `scope.spawn()` returns a `JoinHandle`, prefer collecting handles and awaiting them. The channel pattern above works regardless of API shape. This is the biggest payoff: if Ctrl+C fires during a prefetch batch, the region cancels all in-flight HTTP requests with bounded cleanup instead of silently dropping them. **Estimated signature changes:** ~15 functions gain a `cx: &Cx` parameter. --- ## Phase 4: (Moved to Phase 0d) Error type migration was moved to Phase 0d to resolve a compile-order dependency: the adapter layer (Phase 1) uses the new `GitLabNetworkError { detail }` shape. --- ## Phase 5: Test Migration ### Keep on `#[tokio::test]` (wiremock tests -- 42 tests) No changes. `tokio` is in `[dev-dependencies]` with `features = ["rt", "macros"]`. | File | Tests | |------|-------| | `gitlab/graphql_tests.rs` | 30 | | `gitlab/client_tests.rs` | 4 | | `embedding/pipeline_tests.rs` | 4 | | `ingestion/surgical_tests.rs` | 4 | ### Switch to `#[asupersync::test]` (no wiremock -- 13 tests) | File | Tests | |------|-------| | `core/timeline_seed_tests.rs` | 13 | ### Already `#[test]` (sync -- ~35 files) No changes needed. --- ## Phase 6: Verify and Harden ### Verification checklist ```bash cargo check --all-targets cargo clippy --all-targets -- -D warnings cargo fmt --check cargo test ``` ### Specific things to verify 1. **async-stream on nightly** -- Does `async_stream 0.3` compile on current nightly? 2. **TLS root certs on macOS** -- Does `tls-native-roots` pick up system CA certs? 3. **Connection pool under concurrency** -- Do `join_all` batches (4-8 concurrent requests to same host) work without pool deadlock? 4. **Pagination streams** -- Do `async_stream::stream!` pagination generators work unchanged? 5. **Wiremock test isolation** -- Do wiremock tests pass with tokio only in dev-deps? ### Reqwest behavioral differences to audit reqwest provides several implicit behaviors that asupersync's h1 client may not. Verify each: | reqwest default | gitlore relies on it? | asupersync equivalent | |-----------------|----------------------|----------------------| | Automatic redirect following (up to 10) | Unlikely (GitLab API doesn't redirect) | Verify: if 3xx is returned, does gitlore handle it? | | Automatic gzip/deflate decompression | No (JSON responses are small) | Not needed | | Proxy from `HTTP_PROXY`/`HTTPS_PROXY` env | Possibly (corporate environments) | Must verify asupersync proxy support | | Connection keep-alive | Yes (pagination batches) | Covered by PoolConfig | | System DNS resolution | Yes | Should be same (OS-level) | ### Cancellation + DB transaction alignment Region-based cancellation stops HTTP tasks cleanly, but partial ingestion can leave the database in an inconsistent state if cancellation fires between "fetched data" and "wrote to DB". Verify: - All DB writes in ingestion batches use `unchecked_transaction()` (already the case for most ingestion paths) - Transaction boundaries align with region scope: a cancelled region should not leave partial batch data committed - The existing `ShutdownSignal` check-before-write pattern in orchestrator loops remains functional alongside region cancellation --- ## File Change Summary | File | Change | LOC | |------|--------|-----| | `Cargo.toml` | Swap deps | ~10 | | `rust-toolchain.toml` | NEW -- set nightly | 3 | | `src/http.rs` | NEW -- adapter layer | ~100 | | `src/main.rs` | Entrypoint macro, Cx threading, remove 4 signal handlers | ~40 | | `src/core/shutdown.rs` | Extract + rewrite signal handler | ~20 | | `src/core/error.rs` | Remove reqwest::Error, change GitLabNetworkError (Phase 0d) | ~10 | | `src/gitlab/client.rs` | Replace reqwest, remove tokio imports, adapt all methods | ~80 | | `src/gitlab/graphql.rs` | Replace reqwest | ~20 | | `src/embedding/ollama.rs` | Replace reqwest | ~20 | | `src/cli/commands/sync.rs` | Add Cx param | ~5 | | `src/cli/commands/ingest.rs` | Add Cx param | ~5 | | `src/cli/commands/embed.rs` | Add Cx param | ~5 | | `src/cli/commands/sync_surgical.rs` | Add Cx param | ~5 | | `src/ingestion/orchestrator.rs` | Add Cx param, region-wrap join_all | ~30 | | `src/ingestion/surgical.rs` | Add Cx param | ~10 | | `src/embedding/pipeline.rs` | Add Cx param | ~10 | | `src/core/timeline_seed_tests.rs` | Swap test macro | ~13 | **Total: ~16 files modified, 1 new file, ~400-500 LOC changed.** --- ## Execution Order ``` Phase 0a-0c (prep, safe, independent) | v Phase 0d (error type migration -- required before adapter compiles) | v Phase 1 (adapter layer, compiles but unused) ----+ | | v | These 3 are one Phase 2 (migrate 3 HTTP modules to adapter) ------+ atomic commit | | v | Phase 3 (swap runtime, Cx threading) ------------+ | v Phase 5 (test migration) | v Phase 6 (verify + harden) ``` Phase 0a-0c can be committed independently (good cleanup regardless). Phase 0d (error types) can also land independently, but MUST precede the adapter layer. Phases 1-3 must land together (removing reqwest requires both the adapter AND the new runtime). Phases 5-6 are cleanup that can be incremental. --- ## Risks | Risk | Severity | Mitigation | |------|----------|------------| | asupersync pre-1.0 API changes | High | Adapter layer isolates call sites. Pin exact version. | | Nightly Rust breakage | Medium | Pin nightly date in rust-toolchain.toml. CI tests on nightly. | | TLS cert issues on macOS | Medium | Test early in Phase 6. Fallback: `tls-webpki-roots` (Mozilla bundle). | | Connection pool behavior under load | Medium | Stress test with `join_all` of 8+ concurrent requests in Phase 6. | | async-stream nightly compat | Low | Widely used crate, likely fine. Fallback: manual Stream impl. | | Build time increase | Low | Measure before/after. asupersync may be heavier than tokio. | | Reqwest behavioral drift | Medium | reqwest has implicit redirect/proxy/compression handling. Audit each (see Phase 6 table). GitLab API doesn't redirect, so low actual risk. | | Partial ingestion on cancel | Medium | Region cancellation can fire between HTTP fetch and DB write. Verify transaction boundaries align with region scope (see Phase 6). |