Files
gitlore/plans/asupersync-migration.md
teernisse 77445f6903 docs(plans): add asupersync migration plan
Draft plan for replacing Tokio + Reqwest with Asupersync, a cancel-correct
async runtime with structured concurrency guarantees.

Motivation:
- Current Ctrl+C during join_all silently drops in-flight HTTP requests
- ShutdownSignal is a hand-rolled AtomicBool with no structured cancellation
- No deterministic testing for concurrent ingestion patterns
- Tokio provides no structured concurrency guarantees

Plan structure:
- Complete inventory of tokio/reqwest usage in production and test code
- Phase 0: Preparation (reduce tokio surface before swap)
  - Extract signal handler to single function
  - Replace tokio::sync::Mutex with std::sync::Mutex where appropriate
  - Create HTTP adapter trait for pluggable backends
- Phase 1-5: Progressive migration with detailed implementation steps

Trade-offs accepted:
- Nightly Rust required (asupersync dependency)
- Pre-1.0 runtime dependency (mitigated by adapter layer + version pinning)
- Deeper function signature changes for Cx threading

This is a reference document for future implementation, not an immediate
change to the runtime.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-03-06 11:15:58 -05:00

23 KiB

Plan: Replace Tokio + Reqwest with Asupersync

Date: 2026-03-06 Status: Draft Decisions: Adapter layer (yes), timeouts in adapter, deep Cx threading, reference doc only


Context

Gitlore uses tokio as its async runtime and reqwest as its HTTP client. Both work, but:

  • Ctrl+C during join_all silently drops in-flight HTTP requests with no cleanup
  • ShutdownSignal is a hand-rolled AtomicBool with no structured cancellation
  • No deterministic testing for concurrent ingestion patterns
  • tokio provides no structured concurrency guarantees

Asupersync is a cancel-correct async runtime with region-owned tasks, obligation tracking, and deterministic lab testing. Replacing tokio+reqwest gives us structured shutdown, cancel-correct ingestion, and testable concurrency.

Trade-offs accepted:

  • Nightly Rust required (asupersync dependency)
  • Pre-1.0 runtime dependency (mitigated by adapter layer + version pinning)
  • Deeper function signature changes for Cx threading

Current Tokio Usage Inventory

Production code (must migrate)

Location API Purpose
main.rs:53 #[tokio::main] Runtime entrypoint
main.rs (4 sites) tokio::spawn + tokio::signal::ctrl_c Ctrl+C signal handlers
gitlab/client.rs:9 tokio::sync::Mutex Rate limiter lock
gitlab/client.rs:10 tokio::time::sleep Rate limiter backoff
gitlab/client.rs:729,736 tokio::join! Parallel pagination

Production code (reqwest -- must replace)

Location Usage
gitlab/client.rs REST API: GET with headers/query, response status/headers/JSON, pagination via x-next-page and Link headers, retry on 429
gitlab/graphql.rs GraphQL: POST with Bearer auth + JSON body, response JSON parsing
embedding/ollama.rs Ollama: GET health check, POST JSON embedding requests

Test code (keep on tokio via dev-dep)

File Tests Uses wiremock?
gitlab/graphql_tests.rs 30 Yes
gitlab/client_tests.rs 4 Yes
embedding/pipeline_tests.rs 4 Yes
ingestion/surgical_tests.rs 4 async Yes

Test code (switch to asupersync)

File Tests Why safe
core/timeline_seed_tests.rs 13 Pure CPU/SQLite, no HTTP, no tokio APIs

Test code (already sync #[test] -- no changes)

~35 test files across documents/, core/, embedding/, gitlab/transformers/, ingestion/, cli/commands/, tests/


Phase 0: Preparation (no runtime change)

Goal: Reduce tokio surface area before the swap. Each step is independently valuable.

0a. Extract signal handler

The 4 identical Ctrl+C handlers in main.rs (lines 1020, 2341, 2493, 2524) become one function in core/shutdown.rs:

pub fn install_ctrl_c_handler(signal: ShutdownSignal) {
    tokio::spawn(async move {
        let _ = tokio::signal::ctrl_c().await;
        eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)");
        signal.cancel();
        let _ = tokio::signal::ctrl_c().await;
        std::process::exit(130);
    });
}

4 spawn sites -> 1 function. The function body changes in Phase 3.

0b. Replace tokio::sync::Mutex with std::sync::Mutex

In gitlab/client.rs, the rate limiter lock guards a tiny sync critical section (check Instant::now(), compute delay). No async work inside the lock. std::sync::Mutex is correct and removes a tokio dependency:

// Before
use tokio::sync::Mutex;
let delay = self.rate_limiter.lock().await.check_delay();

// After
use std::sync::Mutex;
let delay = self.rate_limiter.lock().expect("rate limiter poisoned").check_delay();

Note: .expect() over .unwrap() for clarity. Poisoning is near-impossible here (the critical section is a trivial Instant::now() check), but the explicit message aids debugging if it ever fires.

0c. Replace tokio::join! with futures::join!

In gitlab/client.rs:729,736. futures::join! is runtime-agnostic and already in deps.

After Phase 0, remaining tokio in production code:

  • #[tokio::main] (1 site)
  • tokio::spawn + tokio::signal::ctrl_c (1 function)
  • tokio::time::sleep (1 import)

Phase 0d: Error Type Migration (must precede adapter layer)

The adapter layer (Phase 1) uses GitLabNetworkError { detail: Option<String> }, which requires the error type change from Phase 4. Move this change up front so Phases 1-3 compile as a unit.

src/core/error.rs

// Remove:
#[error("HTTP error: {0}")]
Http(#[from] reqwest::Error),

// Change:
#[error("Cannot connect to GitLab at {base_url}")]
GitLabNetworkError {
    base_url: String,
    // Before: source: Option<reqwest::Error>
    // After:
    detail: Option<String>,
},

The adapter layer stringifies HTTP client errors at the boundary so LoreError doesn't depend on any HTTP client's error types. This also means the existing reqwest call sites that construct GitLabNetworkError must be updated to pass detail: Some(format!("{e:?}")) instead of source: Some(e) -- but those sites are rewritten in Phase 2 anyway, so no extra work.


Phase 1: Build the HTTP Adapter Layer

Why

Asupersync's HttpClient is lower-level than reqwest:

  • Headers: Vec<(String, String)> not typed HeaderMap/HeaderValue
  • Body: Vec<u8> not a builder with .json()
  • Status: raw u16 not StatusCode enum
  • Response: body already buffered, no async .json().await
  • No per-request timeout

Without an adapter, every call site becomes 5-6 lines of boilerplate. The adapter also isolates gitlore from asupersync's pre-1.0 HTTP API.

New file: src/http.rs (~100 LOC)

use asupersync::http::h1::{HttpClient, HttpClientConfig, PoolConfig};
use asupersync::http::h1::types::Method;
use asupersync::time::timeout;
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::time::Duration;

use crate::core::error::{LoreError, Result};

pub struct Client {
    inner: HttpClient,
    timeout: Duration,
}

pub struct Response {
    pub status: u16,
    pub reason: String,
    pub headers: Vec<(String, String)>,
    body: Vec<u8>,
}

impl Client {
    pub fn with_timeout(timeout: Duration) -> Self {
        Self {
            inner: HttpClient::with_config(HttpClientConfig {
                pool_config: PoolConfig::builder()
                    .max_connections_per_host(6)
                    .max_total_connections(100)
                    .idle_timeout(Duration::from_secs(90))
                    .build(),
                ..Default::default()
            }),
            timeout,
        }
    }

    pub async fn get(&self, url: &str, headers: &[(&str, &str)]) -> Result<Response> {
        self.execute(Method::Get, url, headers, Vec::new()).await
    }

    pub async fn get_with_query(
        &self,
        url: &str,
        params: &[(&str, String)],
        headers: &[(&str, &str)],
    ) -> Result<Response> {
        let full_url = append_query_params(url, params);
        self.execute(Method::Get, &full_url, headers, Vec::new()).await
    }

    pub async fn post_json<T: Serialize>(
        &self,
        url: &str,
        headers: &[(&str, &str)],
        body: &T,
    ) -> Result<Response> {
        let body_bytes = serde_json::to_vec(body)
            .map_err(|e| LoreError::Other(format!("JSON serialization failed: {e}")))?;
        let mut all_headers = headers.to_vec();
        all_headers.push(("Content-Type", "application/json"));
        self.execute(Method::Post, url, &all_headers, body_bytes).await
    }

    async fn execute(
        &self,
        method: Method,
        url: &str,
        headers: &[(&str, &str)],
        body: Vec<u8>,
    ) -> Result<Response> {
        let header_tuples: Vec<(String, String)> = headers
            .iter()
            .map(|(k, v)| ((*k).to_owned(), (*v).to_owned()))
            .collect();

        let raw = timeout(self.timeout, self.inner.request(method, url, header_tuples, body))
            .await
            .map_err(|_| LoreError::Other(format!("Request timed out after {:?}", self.timeout)))?
            .map_err(|e| LoreError::GitLabNetworkError {
                base_url: url.to_string(),
                detail: Some(format!("{e:?}")),
            })?;

        Ok(Response {
            status: raw.status,
            reason: raw.reason,
            headers: raw.headers,
            body: raw.body,
        })
    }
}

impl Response {
    pub fn is_success(&self) -> bool {
        (200..300).contains(&self.status)
    }

    pub fn json<T: DeserializeOwned>(&self) -> Result<T> {
        serde_json::from_slice(&self.body)
            .map_err(|e| LoreError::Other(format!("JSON parse error: {e}")))
    }

    pub fn text(self) -> Result<String> {
        String::from_utf8(self.body)
            .map_err(|e| LoreError::Other(format!("UTF-8 decode error: {e}")))
    }

    pub fn header(&self, name: &str) -> Option<&str> {
        self.headers
            .iter()
            .find(|(k, _)| k.eq_ignore_ascii_case(name))
            .map(|(_, v)| v.as_str())
    }

    /// Returns all values for a header name (case-insensitive).
    /// Needed for multi-value headers like `Link` used in pagination.
    pub fn headers_all(&self, name: &str) -> Vec<&str> {
        self.headers
            .iter()
            .filter(|(k, _)| k.eq_ignore_ascii_case(name))
            .map(|(_, v)| v.as_str())
            .collect()
    }
}

fn append_query_params(url: &str, params: &[(&str, String)]) -> String {
    if params.is_empty() {
        return url.to_string();
    }
    let query: String = params
        .iter()
        .map(|(k, v)| format!("{}={}", urlencoding::encode(k), urlencoding::encode(v)))
        .collect::<Vec<_>>()
        .join("&");
    if url.contains('?') {
        format!("{url}&{query}")
    } else {
        format!("{url}?{query}")
    }
}

Timeout behavior

Every request is wrapped with asupersync::time::timeout(self.timeout, ...). Default timeouts:

  • GitLab REST/GraphQL: 30s
  • Ollama: configurable (default 60s)
  • Ollama health check: 5s

Phase 2: Migrate the 3 HTTP Modules

2a. gitlab/client.rs (REST API)

Imports:

// Remove
use reqwest::header::{ACCEPT, HeaderMap, HeaderValue};
use reqwest::{Client, Response, StatusCode};

// Add
use crate::http::{Client, Response};

Client construction (lines 68-96):

// Before: reqwest::Client::builder().default_headers(h).timeout(d).build()
// After:
let client = Client::with_timeout(Duration::from_secs(30));

request() method (lines 129-170):

// Before
let response = self.client.get(&url)
    .header("PRIVATE-TOKEN", &self.token)
    .send().await
    .map_err(|e| LoreError::GitLabNetworkError { ... })?;

// After
let response = self.client.get(&url, &[
    ("PRIVATE-TOKEN", &self.token),
    ("Accept", "application/json"),
]).await?;

request_with_headers() method (lines 510-559):

// Before
let response = self.client.get(&url)
    .query(params)
    .header("PRIVATE-TOKEN", &self.token)
    .send().await?;
let headers = response.headers().clone();

// After
let response = self.client.get_with_query(&url, params, &[
    ("PRIVATE-TOKEN", &self.token),
    ("Accept", "application/json"),
]).await?;
// headers already owned in response.headers

handle_response() (lines 182-219):

// Before: async fn (consumed body with .text().await)
// After: sync fn (body already buffered in Response)
fn handle_response<T: DeserializeOwned>(&self, response: Response, path: &str) -> Result<T> {
    match response.status {
        401 => Err(LoreError::GitLabAuthFailed),
        404 => Err(LoreError::GitLabNotFound { resource: path.into() }),
        429 => {
            let retry_after = response.header("retry-after")
                .and_then(|v| v.parse().ok())
                .unwrap_or(60);
            Err(LoreError::GitLabRateLimited { retry_after })
        }
        s if (200..300).contains(&s) => response.json::<T>(),
        s => Err(LoreError::Other(format!("GitLab API error: {} {}", s, response.reason))),
    }
}

Pagination -- No structural changes. async_stream::stream! and header parsing stay the same. Only the response type changes:

// Before: headers.get("x-next-page").and_then(|v| v.to_str().ok())
// After:  response.header("x-next-page")

parse_link_header_next -- Change signature from (headers: &HeaderMap) to (headers: &[(String, String)]) and find by case-insensitive name.

2b. gitlab/graphql.rs

// Before
let response = self.http.post(&url)
    .header("Authorization", format!("Bearer {}", self.token))
    .header("Content-Type", "application/json")
    .json(&body).send().await?;
let json: Value = response.json().await?;

// After
let bearer = format!("Bearer {}", self.token);
let response = self.http.post_json(&url, &[
    ("Authorization", &bearer),
], &body).await?;
let json: Value = response.json()?;

Status matching changes from response.status().as_u16() to response.status (already u16).

2c. embedding/ollama.rs

// Health check
let response = self.client.get(&url, &[]).await?;
let tags: TagsResponse = response.json()?;

// Embed batch
let response = self.client.post_json(&url, &[], &request).await?;
if !response.is_success() {
    let status = response.status; // capture before .text() consumes response
    let body = response.text()?;
    return Err(LoreError::EmbeddingFailed { document_id: 0, reason: format!("HTTP {status}: {body}") });
}
let embed_response: EmbedResponse = response.json()?;

Standalone health check (check_ollama_health): Currently creates a temporary reqwest::Client. Replace with temporary crate::http::Client:

pub async fn check_ollama_health(base_url: &str) -> bool {
    let client = Client::with_timeout(Duration::from_secs(5));
    let url = format!("{base_url}/api/tags");
    client.get(&url, &[]).await.map_or(false, |r| r.is_success())
}

Phase 3: Swap the Runtime + Deep Cx Threading

3a. Cargo.toml

[dependencies]
# Remove:
# reqwest = { version = "0.12", features = ["json"] }
# tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "signal"] }

# Add:
asupersync = { version = "0.2", features = ["tls", "tls-native-roots"] }

# Keep unchanged:
async-stream = "0.3"
futures = { version = "0.3", default-features = false, features = ["alloc"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
urlencoding = "2"

[dev-dependencies]
tempfile = "3"
wiremock = "0.6"
tokio = { version = "1", features = ["rt", "macros"] }

3b. rust-toolchain.toml

[toolchain]
channel = "nightly-2026-03-01"  # Pin specific date to avoid surprise breakage

Update the date as needed when newer nightlies are verified. Never use bare "nightly" in production.

3c. Entrypoint (main.rs:53)

// Before
#[tokio::main]
async fn main() -> Result<()> { ... }

// After
#[asupersync::main]
async fn main(cx: &Cx) -> Outcome<()> { ... }

3d. Signal handler (core/shutdown.rs)

// After (Phase 0 extracted it; now rewrite for asupersync)
pub async fn install_ctrl_c_handler(cx: &Cx, signal: ShutdownSignal) {
    cx.spawn("ctrl-c-handler", async move |cx| {
        cx.shutdown_signal().await;
        eprintln!("\nInterrupted, finishing current batch... (Ctrl+C again to force quit)");
        signal.cancel();
        // Preserve hard-exit on second Ctrl+C (same behavior as Phase 0a)
        cx.shutdown_signal().await;
        std::process::exit(130);
    });
}

3e. Rate limiter sleep

// Before
use tokio::time::sleep;

// After
use asupersync::time::sleep;

3f. Deep Cx threading

Thread Cx from main() through command dispatch into the orchestrator and ingestion modules. This enables region-scoped cancellation for join_all batches.

Function signatures that need cx: &Cx added:

Module Functions
main.rs Command dispatch match arms for sync, ingest, embed
cli/commands/sync.rs run_sync()
cli/commands/ingest.rs run_ingest_command(), run_ingest()
cli/commands/embed.rs run_embed()
cli/commands/sync_surgical.rs run_sync_surgical()
ingestion/orchestrator.rs ingest_issues(), ingest_merge_requests(), ingest_discussions(), etc.
ingestion/surgical.rs surgical_sync()
embedding/pipeline.rs embed_documents(), embed_batch_group()

Region wrapping for join_all batches (orchestrator.rs):

// Before
let prefetched_batch = join_all(prefetch_futures).await;

// After -- cancel-correct region with result collection
let (tx, rx) = std::sync::mpsc::channel();
cx.region(|scope| async {
    for future in prefetch_futures {
        let tx = tx.clone();
        scope.spawn(async move |_cx| {
            let result = future.await;
            let _ = tx.send(result);
        });
    }
    drop(tx);
}).await;
let prefetched_batch: Vec<_> = rx.into_iter().collect();

Note: The exact result-collection pattern depends on asupersync's region API. If scope.spawn() returns a JoinHandle<T>, prefer collecting handles and awaiting them. The channel pattern above works regardless of API shape.

This is the biggest payoff: if Ctrl+C fires during a prefetch batch, the region cancels all in-flight HTTP requests with bounded cleanup instead of silently dropping them.

Estimated signature changes: ~15 functions gain a cx: &Cx parameter.


Phase 4: (Moved to Phase 0d)

Error type migration was moved to Phase 0d to resolve a compile-order dependency: the adapter layer (Phase 1) uses the new GitLabNetworkError { detail } shape.


Phase 5: Test Migration

Keep on #[tokio::test] (wiremock tests -- 42 tests)

No changes. tokio is in [dev-dependencies] with features = ["rt", "macros"].

File Tests
gitlab/graphql_tests.rs 30
gitlab/client_tests.rs 4
embedding/pipeline_tests.rs 4
ingestion/surgical_tests.rs 4

Switch to #[asupersync::test] (no wiremock -- 13 tests)

File Tests
core/timeline_seed_tests.rs 13

Already #[test] (sync -- ~35 files)

No changes needed.


Phase 6: Verify and Harden

Verification checklist

cargo check --all-targets
cargo clippy --all-targets -- -D warnings
cargo fmt --check
cargo test

Specific things to verify

  1. async-stream on nightly -- Does async_stream 0.3 compile on current nightly?
  2. TLS root certs on macOS -- Does tls-native-roots pick up system CA certs?
  3. Connection pool under concurrency -- Do join_all batches (4-8 concurrent requests to same host) work without pool deadlock?
  4. Pagination streams -- Do async_stream::stream! pagination generators work unchanged?
  5. Wiremock test isolation -- Do wiremock tests pass with tokio only in dev-deps?

Reqwest behavioral differences to audit

reqwest provides several implicit behaviors that asupersync's h1 client may not. Verify each:

reqwest default gitlore relies on it? asupersync equivalent
Automatic redirect following (up to 10) Unlikely (GitLab API doesn't redirect) Verify: if 3xx is returned, does gitlore handle it?
Automatic gzip/deflate decompression No (JSON responses are small) Not needed
Proxy from HTTP_PROXY/HTTPS_PROXY env Possibly (corporate environments) Must verify asupersync proxy support
Connection keep-alive Yes (pagination batches) Covered by PoolConfig
System DNS resolution Yes Should be same (OS-level)

Cancellation + DB transaction alignment

Region-based cancellation stops HTTP tasks cleanly, but partial ingestion can leave the database in an inconsistent state if cancellation fires between "fetched data" and "wrote to DB". Verify:

  • All DB writes in ingestion batches use unchecked_transaction() (already the case for most ingestion paths)
  • Transaction boundaries align with region scope: a cancelled region should not leave partial batch data committed
  • The existing ShutdownSignal check-before-write pattern in orchestrator loops remains functional alongside region cancellation

File Change Summary

File Change LOC
Cargo.toml Swap deps ~10
rust-toolchain.toml NEW -- set nightly 3
src/http.rs NEW -- adapter layer ~100
src/main.rs Entrypoint macro, Cx threading, remove 4 signal handlers ~40
src/core/shutdown.rs Extract + rewrite signal handler ~20
src/core/error.rs Remove reqwest::Error, change GitLabNetworkError (Phase 0d) ~10
src/gitlab/client.rs Replace reqwest, remove tokio imports, adapt all methods ~80
src/gitlab/graphql.rs Replace reqwest ~20
src/embedding/ollama.rs Replace reqwest ~20
src/cli/commands/sync.rs Add Cx param ~5
src/cli/commands/ingest.rs Add Cx param ~5
src/cli/commands/embed.rs Add Cx param ~5
src/cli/commands/sync_surgical.rs Add Cx param ~5
src/ingestion/orchestrator.rs Add Cx param, region-wrap join_all ~30
src/ingestion/surgical.rs Add Cx param ~10
src/embedding/pipeline.rs Add Cx param ~10
src/core/timeline_seed_tests.rs Swap test macro ~13

Total: ~16 files modified, 1 new file, ~400-500 LOC changed.


Execution Order

Phase 0a-0c (prep, safe, independent)
  |
  v
Phase 0d (error type migration -- required before adapter compiles)
  |
  v
Phase 1 (adapter layer, compiles but unused) ----+
  |                                               |
  v                                               |  These 3 are one
Phase 2 (migrate 3 HTTP modules to adapter) ------+  atomic commit
  |                                               |
  v                                               |
Phase 3 (swap runtime, Cx threading) ------------+
  |
  v
Phase 5 (test migration)
  |
  v
Phase 6 (verify + harden)

Phase 0a-0c can be committed independently (good cleanup regardless). Phase 0d (error types) can also land independently, but MUST precede the adapter layer. Phases 1-3 must land together (removing reqwest requires both the adapter AND the new runtime). Phases 5-6 are cleanup that can be incremental.


Risks

Risk Severity Mitigation
asupersync pre-1.0 API changes High Adapter layer isolates call sites. Pin exact version.
Nightly Rust breakage Medium Pin nightly date in rust-toolchain.toml. CI tests on nightly.
TLS cert issues on macOS Medium Test early in Phase 6. Fallback: tls-webpki-roots (Mozilla bundle).
Connection pool behavior under load Medium Stress test with join_all of 8+ concurrent requests in Phase 6.
async-stream nightly compat Low Widely used crate, likely fine. Fallback: manual Stream impl.
Build time increase Low Measure before/after. asupersync may be heavier than tokio.
Reqwest behavioral drift Medium reqwest has implicit redirect/proxy/compression handling. Audit each (see Phase 6 table). GitLab API doesn't redirect, so low actual risk.
Partial ingestion on cancel Medium Region cancellation can fire between HTTP fetch and DB write. Verify transaction boundaries align with region scope (see Phase 6).