use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::{Duration, Instant}; use clap::Args as ClapArgs; use futures::stream::{self, StreamExt}; use serde::{Deserialize, Serialize}; use tokio::sync::Semaphore; use crate::core::cache::{CacheManager, CacheMetadata, compute_hash, validate_alias}; use crate::core::config::{AuthType, Config, config_path, resolve_credential}; use crate::core::http::{AsyncHttpClient, ConditionalFetchResult}; use crate::core::indexer::{Format, build_index, detect_format, normalize_to_json}; use crate::core::network::{NetworkPolicy, resolve_policy}; use crate::core::spec::SpecIndex; use crate::errors::SwaggerCliError; use crate::output::robot; // --------------------------------------------------------------------------- // CLI arguments // --------------------------------------------------------------------------- /// Re-fetch and update a cached spec #[derive(Debug, ClapArgs)] pub struct Args { /// Alias to sync (required unless --all is used) pub alias: Option, /// Sync all cached specs #[arg(long)] pub all: bool, /// Check for changes without writing #[arg(long)] pub dry_run: bool, /// Re-fetch regardless of cache freshness #[arg(long)] pub force: bool, /// Include detailed change lists in output #[arg(long)] pub details: bool, /// Auth profile name from config #[arg(long)] pub auth: Option, /// Maximum concurrent sync jobs (only with --all) #[arg(long, default_value = "4")] pub jobs: usize, /// Maximum concurrent requests per host (only with --all) #[arg(long, default_value = "2")] pub per_host: usize, /// Maximum number of alias failures before aborting (only with --all) #[arg(long)] pub max_failures: Option, /// Resume a previously interrupted sync --all #[arg(long)] pub resume: bool, /// Allow private/internal host (repeatable) #[arg(long = "allow-private-host")] pub allow_private_host: Vec, } // --------------------------------------------------------------------------- // Diff types // --------------------------------------------------------------------------- const MAX_DETAIL_ITEMS: usize = 200; #[derive(Debug, Clone, Serialize)] struct EndpointKey { path: String, method: String, } #[derive(Debug, Clone, Serialize)] struct SchemaDiff { added: Vec, removed: Vec, } #[derive(Debug, Clone, Serialize)] struct EndpointDiff { added: Vec, removed: Vec, modified: Vec, } #[derive(Debug, Clone, Serialize)] struct ChangeSummary { endpoints_added: usize, endpoints_removed: usize, endpoints_modified: usize, schemas_added: usize, schemas_removed: usize, } #[derive(Debug, Clone, Serialize)] struct ChangeDetails { endpoints: EndpointDiff, schemas: SchemaDiff, truncated: bool, } // --------------------------------------------------------------------------- // Robot output // --------------------------------------------------------------------------- #[derive(Debug, Serialize)] struct SyncOutput { alias: String, changed: bool, reason: String, local_version: String, remote_version: Option, #[serde(skip_serializing_if = "Option::is_none")] changes: Option, #[serde(skip_serializing_if = "Option::is_none")] details: Option, dry_run: bool, } #[derive(Debug, Clone, Serialize)] struct AliasSyncResult { alias: String, status: String, changed: bool, #[serde(skip_serializing_if = "Option::is_none")] reason: Option, #[serde(skip_serializing_if = "Option::is_none")] error: Option, #[serde(skip_serializing_if = "Option::is_none")] local_version: Option, #[serde(skip_serializing_if = "Option::is_none")] remote_version: Option, #[serde(skip_serializing_if = "Option::is_none")] changes: Option, #[serde(skip_serializing_if = "Option::is_none")] details: Option, duration_ms: u64, } #[derive(Debug, Serialize)] struct SyncAllOutput { total: usize, succeeded: usize, failed: usize, skipped: usize, aborted: bool, results: Vec, dry_run: bool, } // --------------------------------------------------------------------------- // Checkpoint // --------------------------------------------------------------------------- const CHECKPOINT_FILE: &str = "sync-checkpoint.json"; #[derive(Debug, Clone, Serialize, Deserialize)] struct SyncCheckpoint { started_at: String, aliases_completed: Vec, aliases_failed: Vec, } fn load_checkpoint(cache_path: &std::path::Path) -> Option { let path = cache_path.join(CHECKPOINT_FILE); let bytes = std::fs::read(&path).ok()?; serde_json::from_slice(&bytes).ok() } fn save_checkpoint( cache_path: &std::path::Path, checkpoint: &SyncCheckpoint, ) -> Result<(), SwaggerCliError> { let path = cache_path.join(CHECKPOINT_FILE); let bytes = serde_json::to_vec_pretty(checkpoint) .map_err(|e| SwaggerCliError::Cache(format!("failed to serialize checkpoint: {e}")))?; std::fs::write(&path, bytes).map_err(|e| { SwaggerCliError::Cache(format!( "failed to write checkpoint {}: {e}", path.display() )) })?; Ok(()) } fn remove_checkpoint(cache_path: &std::path::Path) { let path = cache_path.join(CHECKPOINT_FILE); let _ = std::fs::remove_file(path); } // --------------------------------------------------------------------------- // Index diffing // --------------------------------------------------------------------------- /// Build a comparable key for an endpoint: (path, METHOD). /// Method is uppercased for consistent comparison regardless of indexer casing. fn endpoint_key(ep: &crate::core::spec::IndexedEndpoint) -> (String, String) { (ep.path.clone(), ep.method.to_uppercase()) } /// Build a fingerprint of an endpoint for modification detection. /// Includes all semantically meaningful fields so that changes to security, /// tags, operation_id, etc. are detected during sync. fn endpoint_fingerprint(ep: &crate::core::spec::IndexedEndpoint) -> String { let params: Vec = ep .parameters .iter() .map(|p| format!("{}:{}:{}", p.name, p.location, p.required)) .collect(); format!( "{}|{}|{}|{}|{}|{}|{}|{}|{}", ep.summary.as_deref().unwrap_or(""), ep.deprecated, params.join(","), ep.request_body_required, ep.request_body_content_types.join(","), ep.security_schemes.join(","), ep.security_required, ep.tags.join(","), ep.operation_id.as_deref().unwrap_or(""), ) } fn compute_diff(old: &SpecIndex, new: &SpecIndex) -> (ChangeSummary, ChangeDetails) { // Endpoint diff let old_keys: BTreeSet<(String, String)> = old.endpoints.iter().map(endpoint_key).collect(); let new_keys: BTreeSet<(String, String)> = new.endpoints.iter().map(endpoint_key).collect(); let old_fingerprints: BTreeMap<(String, String), String> = old .endpoints .iter() .map(|ep| (endpoint_key(ep), endpoint_fingerprint(ep))) .collect(); let new_fingerprints: BTreeMap<(String, String), String> = new .endpoints .iter() .map(|ep| (endpoint_key(ep), endpoint_fingerprint(ep))) .collect(); let added_keys: Vec<(String, String)> = new_keys.difference(&old_keys).cloned().collect(); let removed_keys: Vec<(String, String)> = old_keys.difference(&new_keys).cloned().collect(); let common_keys: BTreeSet<&(String, String)> = old_keys.intersection(&new_keys).collect(); let modified_keys: Vec<(String, String)> = common_keys .into_iter() .filter(|k| old_fingerprints.get(*k) != new_fingerprints.get(*k)) .cloned() .collect(); // Schema diff let old_schemas: BTreeSet = old.schemas.iter().map(|s| s.name.clone()).collect(); let new_schemas: BTreeSet = new.schemas.iter().map(|s| s.name.clone()).collect(); let schemas_added: Vec = new_schemas.difference(&old_schemas).cloned().collect(); let schemas_removed: Vec = old_schemas.difference(&new_schemas).cloned().collect(); let total_items = added_keys.len() + removed_keys.len() + modified_keys.len() + schemas_added.len() + schemas_removed.len(); let truncated = total_items > MAX_DETAIL_ITEMS; let summary = ChangeSummary { endpoints_added: added_keys.len(), endpoints_removed: removed_keys.len(), endpoints_modified: modified_keys.len(), schemas_added: schemas_added.len(), schemas_removed: schemas_removed.len(), }; let to_endpoint_keys = |keys: Vec<(String, String)>, limit: usize| -> Vec { keys.into_iter() .take(limit) .map(|(path, method)| EndpointKey { path, method }) .collect() }; let details = ChangeDetails { endpoints: EndpointDiff { added: to_endpoint_keys(added_keys, MAX_DETAIL_ITEMS), removed: to_endpoint_keys(removed_keys, MAX_DETAIL_ITEMS), modified: to_endpoint_keys(modified_keys, MAX_DETAIL_ITEMS), }, schemas: SchemaDiff { added: schemas_added.into_iter().take(MAX_DETAIL_ITEMS).collect(), removed: schemas_removed.into_iter().take(MAX_DETAIL_ITEMS).collect(), }, truncated, }; (summary, details) } // --------------------------------------------------------------------------- // Per-host semaphore map // --------------------------------------------------------------------------- fn extract_host(url: &str) -> String { reqwest::Url::parse(url) .ok() .and_then(|u| u.host_str().map(String::from)) .unwrap_or_default() } struct PerHostThrottle { semaphores: tokio::sync::Mutex>>, max_per_host: usize, } impl PerHostThrottle { fn new(max_per_host: usize) -> Self { Self { semaphores: tokio::sync::Mutex::new(HashMap::new()), max_per_host, } } async fn acquire(&self, host: &str) -> tokio::sync::OwnedSemaphorePermit { let sem = { let mut map = self.semaphores.lock().await; map.entry(host.to_string()) .or_insert_with(|| Arc::new(Semaphore::new(self.max_per_host))) .clone() }; sem.acquire_owned() .await .expect("semaphore should not be closed") } } // --------------------------------------------------------------------------- // Core single-alias sync logic (testable with explicit cache path) // --------------------------------------------------------------------------- /// Sync a single alias. Returns a structured result for use by both single /// and batch modes. #[allow(clippy::too_many_arguments)] async fn sync_one_alias( alias: &str, cache_path: &std::path::Path, dry_run: bool, force: bool, include_details: bool, auth: Option<&str>, network_policy: NetworkPolicy, config_override: Option<&std::path::Path>, per_host_throttle: Option<&PerHostThrottle>, allowed_private_hosts: &[String], ) -> AliasSyncResult { let start = Instant::now(); match sync_one_alias_inner( alias, cache_path, dry_run, force, include_details, auth, network_policy, config_override, per_host_throttle, allowed_private_hosts, ) .await { Ok(result) => result, Err(e) => AliasSyncResult { alias: alias.to_string(), status: "failed".to_string(), changed: false, reason: None, error: Some(e.to_string()), local_version: None, remote_version: None, changes: None, details: None, duration_ms: start.elapsed().as_millis().min(u64::MAX as u128) as u64, }, } } #[allow(clippy::too_many_arguments)] async fn sync_one_alias_inner( alias: &str, cache_path: &std::path::Path, dry_run: bool, force: bool, include_details: bool, auth: Option<&str>, network_policy: NetworkPolicy, config_override: Option<&std::path::Path>, per_host_throttle: Option<&PerHostThrottle>, allowed_private_hosts: &[String], ) -> Result { let start = Instant::now(); let cm = CacheManager::new(cache_path.to_path_buf()); validate_alias(alias)?; let (old_index, meta) = cm.load_index(alias)?; let url = meta.url.clone().ok_or_else(|| { SwaggerCliError::Usage(format!( "alias '{alias}' has no URL (fetched from stdin/file). Cannot sync." )) })?; // Build HTTP client let cfg = Config::load(&config_path(config_override))?; let mut builder = AsyncHttpClient::builder() .allow_insecure_http(url.starts_with("http://")) .allowed_private_hosts(allowed_private_hosts.to_vec()) .network_policy(network_policy); if let Some(profile_name) = auth { let profile = cfg.auth_profiles.get(profile_name).ok_or_else(|| { SwaggerCliError::Auth(format!("auth profile '{profile_name}' not found in config")) })?; let credential = resolve_credential(&profile.credential)?; match &profile.auth_type { AuthType::Bearer => { builder = builder .auth_header("Authorization".to_string(), format!("Bearer {credential}")); } AuthType::ApiKey { header } => { builder = builder.auth_header(header.clone(), credential); } } } let client = builder.build(); // Conditional fetch let (etag, last_modified) = if force { (None, None) } else { (meta.etag.as_deref(), meta.last_modified.as_deref()) }; // Acquire per-host permit if throttling is enabled let _host_permit = if let Some(throttle) = per_host_throttle { let host = extract_host(&url); Some(throttle.acquire(&host).await) } else { None }; let fetch_result = client.fetch_conditional(&url, etag, last_modified).await?; let elapsed_ms = || start.elapsed().as_millis().min(u64::MAX as u128) as u64; match fetch_result { ConditionalFetchResult::NotModified => Ok(AliasSyncResult { alias: alias.to_string(), status: "success".to_string(), changed: false, reason: Some("304 Not Modified".to_string()), error: None, local_version: Some(meta.spec_version.clone()), remote_version: None, changes: None, details: None, duration_ms: elapsed_ms(), }), ConditionalFetchResult::Modified(result) => { let new_content_hash = compute_hash(&result.bytes); if new_content_hash == meta.content_hash && !force { return Ok(AliasSyncResult { alias: alias.to_string(), status: "success".to_string(), changed: false, reason: Some("content hash unchanged".to_string()), error: None, local_version: Some(meta.spec_version.clone()), remote_version: None, changes: None, details: None, duration_ms: elapsed_ms(), }); } let format = detect_format(&result.bytes, Some(&url), result.content_type.as_deref()); let format_str = match format { Format::Json => "json", Format::Yaml => "yaml", }; let (json_bytes, value) = normalize_to_json(&result.bytes, format)?; let new_index = build_index(&value, &new_content_hash, meta.generation + 1)?; let (summary, details) = compute_diff(&old_index, &new_index); let has_changes = summary.endpoints_added > 0 || summary.endpoints_removed > 0 || summary.endpoints_modified > 0 || summary.schemas_added > 0 || summary.schemas_removed > 0; let changed = new_content_hash != meta.content_hash || has_changes; if !dry_run && changed { cm.write_cache( alias, &result.bytes, &json_bytes, &new_index, Some(url.clone()), &new_index.info.version, &new_index.info.title, format_str, result.etag.clone(), result.last_modified.clone(), Some(meta.generation), )?; } let reason = if changed { "content changed" } else { "no changes detected" }; Ok(AliasSyncResult { alias: alias.to_string(), status: "success".to_string(), changed, reason: Some(reason.to_string()), error: None, local_version: Some(meta.spec_version.clone()), remote_version: Some(new_index.info.version.clone()), changes: if include_details { Some(summary) } else { None }, details: if include_details { Some(details) } else { None }, duration_ms: elapsed_ms(), }) } } } // --------------------------------------------------------------------------- // Legacy sync_inner (single alias via Args) // --------------------------------------------------------------------------- async fn sync_inner( args: &Args, cache_path: PathBuf, robot_mode: bool, network_policy: NetworkPolicy, config_override: Option<&std::path::Path>, ) -> Result<(), SwaggerCliError> { let alias = args .alias .as_deref() .ok_or_else(|| SwaggerCliError::Usage("alias is required when not using --all".into()))?; let start = Instant::now(); let cm = CacheManager::new(cache_path.clone()); validate_alias(alias)?; // 1. Load existing metadata and index let (old_index, meta) = cm.load_index(alias)?; let url = meta.url.clone().ok_or_else(|| { SwaggerCliError::Usage(format!( "alias '{alias}' has no URL (fetched from stdin/file). Cannot sync.", )) })?; // 2. Build HTTP client let cfg = Config::load(&config_path(config_override))?; let mut builder = AsyncHttpClient::builder() .allow_insecure_http(url.starts_with("http://")) .allowed_private_hosts(args.allow_private_host.clone()) .network_policy(network_policy); if let Some(profile_name) = &args.auth { let profile = cfg.auth_profiles.get(profile_name).ok_or_else(|| { SwaggerCliError::Auth(format!("auth profile '{profile_name}' not found in config")) })?; let credential = resolve_credential(&profile.credential)?; match &profile.auth_type { AuthType::Bearer => { builder = builder .auth_header("Authorization".to_string(), format!("Bearer {credential}")); } AuthType::ApiKey { header } => { builder = builder.auth_header(header.clone(), credential); } } } let client = builder.build(); // 3. Conditional fetch let (etag, last_modified) = if args.force { (None, None) } else { (meta.etag.as_deref(), meta.last_modified.as_deref()) }; let fetch_result = client.fetch_conditional(&url, etag, last_modified).await?; match fetch_result { ConditionalFetchResult::NotModified => { output_no_changes( alias, args.dry_run, &meta, "304 Not Modified", robot_mode, start.elapsed(), ); return Ok(()); } ConditionalFetchResult::Modified(result) => { // 4. Check content hash let new_content_hash = compute_hash(&result.bytes); if new_content_hash == meta.content_hash && !args.force { output_no_changes( alias, args.dry_run, &meta, "content hash unchanged", robot_mode, start.elapsed(), ); return Ok(()); } // 5. Normalize and build index let format = detect_format(&result.bytes, Some(&url), result.content_type.as_deref()); let format_str = match format { Format::Json => "json", Format::Yaml => "yaml", }; let (json_bytes, value) = normalize_to_json(&result.bytes, format)?; let new_index = build_index(&value, &new_content_hash, meta.generation + 1)?; // 6. Compute diff let (summary, details) = compute_diff(&old_index, &new_index); let has_changes = summary.endpoints_added > 0 || summary.endpoints_removed > 0 || summary.endpoints_modified > 0 || summary.schemas_added > 0 || summary.schemas_removed > 0; let changed = new_content_hash != meta.content_hash || has_changes; // 7. Write cache (unless dry-run) if !args.dry_run && changed { cm.write_cache( alias, &result.bytes, &json_bytes, &new_index, Some(url.clone()), &new_index.info.version, &new_index.info.title, format_str, result.etag.clone(), result.last_modified.clone(), Some(meta.generation), )?; } // 8. Output output_changes( alias, args.dry_run, args.details, &meta, &new_index, changed, &summary, &details, robot_mode, start.elapsed(), ); } } Ok(()) } fn output_no_changes( alias: &str, dry_run: bool, meta: &CacheMetadata, reason: &str, robot_mode: bool, duration: Duration, ) { if robot_mode { let output = SyncOutput { alias: alias.to_string(), changed: false, reason: reason.to_string(), local_version: meta.spec_version.clone(), remote_version: None, changes: None, details: None, dry_run, }; robot::robot_success(output, "sync", duration); } else { println!("'{alias}' is up to date ({reason})"); } } #[allow(clippy::too_many_arguments)] fn output_changes( alias: &str, dry_run: bool, include_details: bool, old_meta: &CacheMetadata, new_index: &SpecIndex, changed: bool, summary: &ChangeSummary, details: &ChangeDetails, robot_mode: bool, duration: Duration, ) { if robot_mode { let output = SyncOutput { alias: alias.to_string(), changed, reason: if changed { "content changed".to_string() } else { "no changes detected".to_string() }, local_version: old_meta.spec_version.clone(), remote_version: Some(new_index.info.version.clone()), changes: Some(summary.clone()), details: if include_details { Some(details.clone()) } else { None }, dry_run, }; robot::robot_success(output, "sync", duration); } else if changed { let prefix = if dry_run { "[dry-run] " } else { "" }; println!( "{prefix}'{alias}' has changes (v{} -> v{})", old_meta.spec_version, new_index.info.version ); println!( " Endpoints: +{} -{} ~{}", summary.endpoints_added, summary.endpoints_removed, summary.endpoints_modified ); println!( " Schemas: +{} -{}", summary.schemas_added, summary.schemas_removed ); if dry_run { println!(" (dry run -- no changes written)"); } } else { println!("'{alias}' is up to date (content unchanged)"); } } // --------------------------------------------------------------------------- // sync --all implementation // --------------------------------------------------------------------------- async fn sync_all_inner( args: &Args, cache_path: PathBuf, robot_mode: bool, network_policy: NetworkPolicy, config_override: Option<&std::path::Path>, ) -> Result<(), SwaggerCliError> { let start = Instant::now(); let cm = CacheManager::new(cache_path.clone()); let all_metas = cm.list_aliases()?; // Filter to aliases that have URLs (can be synced remotely) let mut aliases: Vec = all_metas .iter() .filter(|m| m.url.is_some()) .map(|m| m.alias.clone()) .collect(); aliases.sort(); // Load checkpoint if resuming let checkpoint = if args.resume { load_checkpoint(&cache_path) } else { None }; let completed_set: BTreeSet = checkpoint .as_ref() .map(|c| c.aliases_completed.iter().cloned().collect()) .unwrap_or_default(); let failed_set: BTreeSet = checkpoint .as_ref() .map(|c| c.aliases_failed.iter().cloned().collect()) .unwrap_or_default(); // When resuming, skip already completed aliases let to_sync: Vec = if args.resume { aliases .iter() .filter(|a| !completed_set.contains(a.as_str()) && !failed_set.contains(a.as_str())) .cloned() .collect() } else { aliases.clone() }; let total = aliases.len(); // Handle empty aliases if total == 0 { let output = SyncAllOutput { total: 0, succeeded: 0, failed: 0, skipped: 0, aborted: false, results: vec![], dry_run: args.dry_run, }; if robot_mode { robot::robot_success(output, "sync", start.elapsed()); } else { println!("No aliases with URLs to sync."); } return Ok(()); } // Initialize checkpoint let checkpoint_state = Arc::new(tokio::sync::Mutex::new(SyncCheckpoint { started_at: chrono::Utc::now().to_rfc3339(), aliases_completed: completed_set.iter().cloned().collect(), aliases_failed: failed_set.iter().cloned().collect(), })); // Concurrency controls let jobs = args.jobs.max(1); let global_sem = Arc::new(Semaphore::new(jobs)); let per_host_throttle = Arc::new(PerHostThrottle::new(args.per_host.max(1))); let failure_count = Arc::new(AtomicUsize::new(0)); let max_failures = args.max_failures; let aborted = Arc::new(std::sync::atomic::AtomicBool::new(false)); let auth = args.auth.clone(); let dry_run = args.dry_run; let force = args.force; let details = args.details; let allowed_private_hosts = args.allow_private_host.clone(); let results: Vec = stream::iter(to_sync) .map(|alias| { let global_sem = global_sem.clone(); let per_host_throttle = per_host_throttle.clone(); let failure_count = failure_count.clone(); let aborted = aborted.clone(); let cache_path = cache_path.clone(); let auth = auth.clone(); let checkpoint_state = checkpoint_state.clone(); let allowed_private_hosts = allowed_private_hosts.clone(); async move { // Check if we have been aborted if aborted.load(Ordering::Relaxed) { return AliasSyncResult { alias: alias.clone(), status: "skipped".to_string(), changed: false, reason: Some("aborted due to failure budget".to_string()), error: None, local_version: None, remote_version: None, changes: None, details: None, duration_ms: 0, }; } // Acquire global concurrency permit let _permit = global_sem .acquire() .await .expect("semaphore should not be closed"); // Double-check abort after acquiring permit if aborted.load(Ordering::Relaxed) { return AliasSyncResult { alias: alias.clone(), status: "skipped".to_string(), changed: false, reason: Some("aborted due to failure budget".to_string()), error: None, local_version: None, remote_version: None, changes: None, details: None, duration_ms: 0, }; } let result = sync_one_alias( &alias, &cache_path, dry_run, force, details, auth.as_deref(), network_policy, config_override, Some(&per_host_throttle), &allowed_private_hosts, ) .await; // Update checkpoint { let mut cp = checkpoint_state.lock().await; if result.status == "success" { cp.aliases_completed.push(alias.clone()); } else { cp.aliases_failed.push(alias.clone()); } let _ = save_checkpoint(&cache_path, &cp); } // Track failures if result.status == "failed" { let count = failure_count.fetch_add(1, Ordering::Relaxed) + 1; if let Some(max) = max_failures && count >= max { aborted.store(true, Ordering::Relaxed); } } result } }) .buffer_unordered(jobs) .collect() .await; // Build results including resumed entries let mut all_results: Vec = Vec::new(); // Add previously-completed entries from resume if args.resume { for alias in &completed_set { all_results.push(AliasSyncResult { alias: alias.clone(), status: "skipped".to_string(), changed: false, reason: Some("already completed (resumed)".to_string()), error: None, local_version: None, remote_version: None, changes: None, details: None, duration_ms: 0, }); } for alias in &failed_set { all_results.push(AliasSyncResult { alias: alias.clone(), status: "skipped".to_string(), changed: false, reason: Some("previously failed (resumed)".to_string()), error: None, local_version: None, remote_version: None, changes: None, details: None, duration_ms: 0, }); } } all_results.extend(results); all_results.sort_by(|a, b| a.alias.cmp(&b.alias)); let succeeded = all_results.iter().filter(|r| r.status == "success").count(); let failed = all_results.iter().filter(|r| r.status == "failed").count(); let skipped = all_results.iter().filter(|r| r.status == "skipped").count(); let was_aborted = aborted.load(Ordering::Relaxed); let output = SyncAllOutput { total, succeeded, failed, skipped, aborted: was_aborted, results: all_results.clone(), dry_run, }; if robot_mode { robot::robot_success(output, "sync", start.elapsed()); } else { for r in &all_results { match r.status.as_str() { "success" if r.changed => { println!( " [ok] {} (changed, v{} -> v{})", r.alias, r.local_version.as_deref().unwrap_or("?"), r.remote_version.as_deref().unwrap_or("?"), ); } "success" => { println!(" [ok] {} (up to date)", r.alias,); } "failed" => { println!( " [FAIL] {} -- {}", r.alias, r.error.as_deref().unwrap_or("unknown error"), ); } "skipped" => { println!( " [skip] {} -- {}", r.alias, r.reason.as_deref().unwrap_or("skipped"), ); } _ => {} } } println!( "\nSync complete: {succeeded} ok, {failed} failed, {skipped} skipped (of {total} total)" ); if was_aborted { println!(" (aborted early: failure budget exceeded)"); } } // Clean up checkpoint on successful completion (no abort) if !was_aborted { remove_checkpoint(&cache_path); } Ok(()) } // --------------------------------------------------------------------------- // Public entry point // --------------------------------------------------------------------------- pub async fn execute( args: &Args, robot: bool, network_flag: &str, config_override: Option<&std::path::Path>, ) -> Result<(), SwaggerCliError> { if args.all { if args.alias.is_some() { return Err(SwaggerCliError::Usage( "cannot specify an alias with --all".into(), )); } let cache = crate::core::config::cache_dir(); let policy = resolve_policy(network_flag)?; return sync_all_inner(args, cache, robot, policy, config_override).await; } if args.alias.is_none() { return Err(SwaggerCliError::Usage( "alias is required when not using --all".into(), )); } let cache = crate::core::config::cache_dir(); let policy = resolve_policy(network_flag)?; sync_inner(args, cache, robot, policy, config_override).await } // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- #[cfg(test)] mod tests { use super::*; use crate::core::cache::CacheManager; use crate::core::indexer::build_index; use crate::core::spec::{ IndexInfo, IndexedEndpoint, IndexedParam, IndexedSchema, IndexedTag, SpecIndex, }; fn make_test_index(endpoints: Vec, schemas: Vec) -> SpecIndex { let tags: Vec = vec![]; SpecIndex { index_version: 1, generation: 1, content_hash: "sha256:test".into(), openapi: "3.0.3".into(), info: IndexInfo { title: "Test".into(), version: "1.0.0".into(), }, endpoints, schemas, tags, } } fn make_endpoint(path: &str, method: &str, summary: Option<&str>) -> IndexedEndpoint { IndexedEndpoint { path: path.into(), method: method.into(), summary: summary.map(String::from), description: None, operation_id: None, tags: vec![], deprecated: false, parameters: vec![], request_body_required: false, request_body_content_types: vec![], security_schemes: vec![], security_required: false, operation_ptr: format!( "/paths/{}/{}", path.replace('/', "~1"), method.to_lowercase() ), } } fn make_schema(name: &str) -> IndexedSchema { IndexedSchema { name: name.into(), schema_ptr: format!("/components/schemas/{name}"), } } // -- Diff computation tests ------------------------------------------------ #[test] fn test_diff_no_changes() { let endpoints = vec![make_endpoint("/pets", "GET", Some("List pets"))]; let schemas = vec![make_schema("Pet")]; let old = make_test_index(endpoints.clone(), schemas.clone()); let new = make_test_index(endpoints, schemas); let (summary, details) = compute_diff(&old, &new); assert_eq!(summary.endpoints_added, 0); assert_eq!(summary.endpoints_removed, 0); assert_eq!(summary.endpoints_modified, 0); assert_eq!(summary.schemas_added, 0); assert_eq!(summary.schemas_removed, 0); assert!(!details.truncated); } #[test] fn test_diff_added_endpoint() { let old = make_test_index( vec![make_endpoint("/pets", "GET", Some("List pets"))], vec![], ); let new = make_test_index( vec![ make_endpoint("/pets", "GET", Some("List pets")), make_endpoint("/pets", "POST", Some("Create pet")), ], vec![], ); let (summary, details) = compute_diff(&old, &new); assert_eq!(summary.endpoints_added, 1); assert_eq!(summary.endpoints_removed, 0); assert_eq!(details.endpoints.added.len(), 1); assert_eq!(details.endpoints.added[0].path, "/pets"); assert_eq!(details.endpoints.added[0].method, "POST"); } #[test] fn test_diff_removed_endpoint() { let old = make_test_index( vec![ make_endpoint("/pets", "GET", Some("List pets")), make_endpoint("/pets", "POST", Some("Create pet")), ], vec![], ); let new = make_test_index( vec![make_endpoint("/pets", "GET", Some("List pets"))], vec![], ); let (summary, details) = compute_diff(&old, &new); assert_eq!(summary.endpoints_removed, 1); assert_eq!(details.endpoints.removed.len(), 1); assert_eq!(details.endpoints.removed[0].method, "POST"); } #[test] fn test_diff_modified_endpoint() { let old = make_test_index( vec![make_endpoint("/pets", "GET", Some("List pets"))], vec![], ); let new = make_test_index( vec![make_endpoint("/pets", "GET", Some("List all pets"))], vec![], ); let (summary, _details) = compute_diff(&old, &new); assert_eq!(summary.endpoints_modified, 1); } #[test] fn test_diff_added_schema() { let old = make_test_index(vec![], vec![make_schema("Pet")]); let new = make_test_index(vec![], vec![make_schema("Pet"), make_schema("Error")]); let (summary, details) = compute_diff(&old, &new); assert_eq!(summary.schemas_added, 1); assert_eq!(details.schemas.added, vec!["Error"]); } #[test] fn test_diff_removed_schema() { let old = make_test_index(vec![], vec![make_schema("Pet"), make_schema("Error")]); let new = make_test_index(vec![], vec![make_schema("Pet")]); let (summary, details) = compute_diff(&old, &new); assert_eq!(summary.schemas_removed, 1); assert_eq!(details.schemas.removed, vec!["Error"]); } #[test] fn test_diff_endpoint_modified_by_params() { let mut ep_old = make_endpoint("/pets", "GET", Some("List pets")); ep_old.parameters = vec![IndexedParam { name: "limit".into(), location: "query".into(), required: false, description: None, }]; let mut ep_new = make_endpoint("/pets", "GET", Some("List pets")); ep_new.parameters = vec![ IndexedParam { name: "limit".into(), location: "query".into(), required: false, description: None, }, IndexedParam { name: "offset".into(), location: "query".into(), required: false, description: None, }, ]; let old = make_test_index(vec![ep_old], vec![]); let new = make_test_index(vec![ep_new], vec![]); let (summary, _) = compute_diff(&old, &new); assert_eq!(summary.endpoints_modified, 1); } #[test] fn test_diff_truncation() { // Create enough items to exceed MAX_DETAIL_ITEMS let mut new_endpoints: Vec = Vec::new(); for i in 0..250 { new_endpoints.push(make_endpoint(&format!("/item{i}"), "GET", None)); } let old = make_test_index(vec![], vec![]); let new = make_test_index(new_endpoints, vec![]); let (summary, details) = compute_diff(&old, &new); assert_eq!(summary.endpoints_added, 250); assert!(details.truncated); assert_eq!(details.endpoints.added.len(), MAX_DETAIL_ITEMS); } // -- Hash-based change detection (unit) ------------------------------------ #[test] fn test_sync_no_changes_same_hash() { // Simulate: same content hash -> no changes let raw_bytes = br#"{"openapi":"3.0.3","info":{"title":"Test","version":"1.0.0"},"paths":{}}"#; let content_hash = compute_hash(raw_bytes); assert_eq!(content_hash, compute_hash(raw_bytes)); } #[test] fn test_sync_changes_different_hash() { let raw_v1 = br#"{"openapi":"3.0.3","info":{"title":"Test","version":"1.0.0"},"paths":{}}"#; let raw_v2 = br#"{"openapi":"3.0.3","info":{"title":"Test","version":"2.0.0"},"paths":{}}"#; let hash_v1 = compute_hash(raw_v1); let hash_v2 = compute_hash(raw_v2); assert_ne!(hash_v1, hash_v2); } // -- Integration: sync with local cache ------------------------------------ fn write_test_cache( cache_path: &std::path::Path, alias: &str, spec_json: &serde_json::Value, url: Option, ) -> CacheMetadata { let cm = CacheManager::new(cache_path.to_path_buf()); let raw_bytes = serde_json::to_vec(spec_json).unwrap(); let content_hash = compute_hash(&raw_bytes); let json_bytes = raw_bytes.clone(); let index = build_index(spec_json, &content_hash, 1).unwrap(); cm.write_cache( alias, &raw_bytes, &json_bytes, &index, url, &index.info.version, &index.info.title, "json", Some("\"etag-v1\"".to_string()), Some("Wed, 21 Oct 2025 07:28:00 GMT".to_string()), None, ) .unwrap() } #[test] fn test_sync_diff_detects_new_endpoint_in_index() { let spec_v1 = serde_json::json!({ "openapi": "3.0.3", "info": { "title": "Test", "version": "1.0.0" }, "paths": { "/pets": { "get": { "summary": "List pets", "responses": { "200": { "description": "OK" } } } } } }); let spec_v2 = serde_json::json!({ "openapi": "3.0.3", "info": { "title": "Test", "version": "2.0.0" }, "paths": { "/pets": { "get": { "summary": "List pets", "responses": { "200": { "description": "OK" } } }, "post": { "summary": "Create pet", "responses": { "201": { "description": "Created" } } } } } }); let raw_v1 = serde_json::to_vec(&spec_v1).unwrap(); let raw_v2 = serde_json::to_vec(&spec_v2).unwrap(); let hash_v1 = compute_hash(&raw_v1); let hash_v2 = compute_hash(&raw_v2); let index_v1 = build_index(&spec_v1, &hash_v1, 1).unwrap(); let index_v2 = build_index(&spec_v2, &hash_v2, 2).unwrap(); let (summary, details) = compute_diff(&index_v1, &index_v2); assert_eq!(summary.endpoints_added, 1); assert_eq!(summary.endpoints_removed, 0); assert_eq!(summary.endpoints_modified, 0); assert_eq!(details.endpoints.added.len(), 1); assert_eq!(details.endpoints.added[0].path, "/pets"); assert_eq!(details.endpoints.added[0].method, "POST"); } #[test] fn test_compute_diff_complex_scenario() { // Old: GET /pets, GET /pets/{id}, DELETE /pets/{id}, schemas: Pet, Error // New: GET /pets (modified summary), POST /pets, GET /pets/{id}, schemas: Pet, Owner let old = make_test_index( vec![ make_endpoint("/pets", "GET", Some("List pets")), make_endpoint("/pets/{id}", "GET", Some("Get pet")), make_endpoint("/pets/{id}", "DELETE", Some("Delete pet")), ], vec![make_schema("Pet"), make_schema("Error")], ); let new = make_test_index( vec![ make_endpoint("/pets", "GET", Some("List all pets")), // modified summary make_endpoint("/pets", "POST", Some("Create pet")), // added make_endpoint("/pets/{id}", "GET", Some("Get pet")), // unchanged ], vec![make_schema("Pet"), make_schema("Owner")], // Error removed, Owner added ); let (summary, details) = compute_diff(&old, &new); assert_eq!(summary.endpoints_added, 1); // POST /pets assert_eq!(summary.endpoints_removed, 1); // DELETE /pets/{id} assert_eq!(summary.endpoints_modified, 1); // GET /pets summary changed assert_eq!(summary.schemas_added, 1); // Owner assert_eq!(summary.schemas_removed, 1); // Error assert_eq!(details.endpoints.added[0].method, "POST"); assert_eq!(details.endpoints.removed[0].method, "DELETE"); assert_eq!(details.endpoints.modified[0].path, "/pets"); assert_eq!(details.schemas.added, vec!["Owner"]); assert_eq!(details.schemas.removed, vec!["Error"]); assert!(!details.truncated); } // -- Checkpoint tests ------------------------------------------------------ #[test] fn test_checkpoint_roundtrip() { let tmp = tempfile::tempdir().unwrap(); let checkpoint = SyncCheckpoint { started_at: "2025-01-01T00:00:00Z".to_string(), aliases_completed: vec!["api1".into(), "api2".into()], aliases_failed: vec!["api3".into()], }; save_checkpoint(tmp.path(), &checkpoint).unwrap(); let loaded = load_checkpoint(tmp.path()).unwrap(); assert_eq!(loaded.started_at, checkpoint.started_at); assert_eq!(loaded.aliases_completed, checkpoint.aliases_completed); assert_eq!(loaded.aliases_failed, checkpoint.aliases_failed); } #[test] fn test_checkpoint_missing_returns_none() { let tmp = tempfile::tempdir().unwrap(); assert!(load_checkpoint(tmp.path()).is_none()); } #[test] fn test_checkpoint_corrupt_returns_none() { let tmp = tempfile::tempdir().unwrap(); std::fs::write(tmp.path().join(CHECKPOINT_FILE), b"not json").unwrap(); assert!(load_checkpoint(tmp.path()).is_none()); } #[test] fn test_remove_checkpoint() { let tmp = tempfile::tempdir().unwrap(); let checkpoint = SyncCheckpoint { started_at: "2025-01-01T00:00:00Z".to_string(), aliases_completed: vec![], aliases_failed: vec![], }; save_checkpoint(tmp.path(), &checkpoint).unwrap(); assert!(tmp.path().join(CHECKPOINT_FILE).exists()); remove_checkpoint(tmp.path()); assert!(!tmp.path().join(CHECKPOINT_FILE).exists()); } // -- Per-host throttle tests ----------------------------------------------- #[tokio::test] async fn test_per_host_throttle_limits_concurrency() { let throttle = PerHostThrottle::new(1); // Acquire one permit for "example.com" let permit1 = throttle.acquire("example.com").await; // Try to acquire another -- should block. We use try_acquire under the hood // by checking that a second acquire with timeout would not succeed immediately. let result = tokio::time::timeout(Duration::from_millis(50), throttle.acquire("example.com")).await; // Should timeout because the first permit is held assert!(result.is_err(), "second acquire should block"); // Different host should work fine let _permit_other = tokio::time::timeout(Duration::from_millis(50), throttle.acquire("other.com")).await; assert!(_permit_other.is_ok(), "different host should not block"); // Drop permit1, now acquiring for example.com should work drop(permit1); let result2 = tokio::time::timeout(Duration::from_millis(50), throttle.acquire("example.com")).await; assert!(result2.is_ok(), "should succeed after drop"); } #[test] fn test_extract_host() { assert_eq!( extract_host("https://api.example.com/v1/spec.json"), "api.example.com" ); assert_eq!(extract_host("http://localhost:8080/spec"), "localhost"); assert_eq!(extract_host("not-a-url"), ""); } // -- sync --all with mockito ----------------------------------------------- #[tokio::test] async fn test_sync_all_empty_cache() { let tmp = tempfile::tempdir().unwrap(); let cache_path = tmp.path().to_path_buf(); // Create cache dir but no aliases std::fs::create_dir_all(&cache_path).unwrap(); let args = Args { alias: None, all: true, dry_run: false, force: false, details: false, auth: None, jobs: 4, per_host: 2, max_failures: None, resume: false, allow_private_host: vec![], }; let result = sync_all_inner(&args, cache_path, true, NetworkPolicy::Auto, None).await; assert!(result.is_ok()); } #[tokio::test] async fn test_sync_all_concurrent() { // Set up 4 aliases pointing to a mock server let mut server = mockito::Server::new_async().await; let base_url = server.url(); let tmp = tempfile::tempdir().unwrap(); let cache_path = tmp.path().to_path_buf(); let specs: Vec<(&str, serde_json::Value)> = vec![ ( "api1", serde_json::json!({ "openapi": "3.0.3", "info": { "title": "API 1", "version": "1.0.0" }, "paths": { "/a": { "get": { "summary": "A", "responses": { "200": { "description": "OK" }}}}} }), ), ( "api2", serde_json::json!({ "openapi": "3.0.3", "info": { "title": "API 2", "version": "1.0.0" }, "paths": { "/b": { "get": { "summary": "B", "responses": { "200": { "description": "OK" }}}}} }), ), ( "api3", serde_json::json!({ "openapi": "3.0.3", "info": { "title": "API 3", "version": "1.0.0" }, "paths": { "/c": { "get": { "summary": "C", "responses": { "200": { "description": "OK" }}}}} }), ), ( "api4", serde_json::json!({ "openapi": "3.0.3", "info": { "title": "API 4", "version": "1.0.0" }, "paths": { "/d": { "get": { "summary": "D", "responses": { "200": { "description": "OK" }}}}} }), ), ]; // Create mocks for each spec -- return same content (no changes) let mut mocks = Vec::new(); for (alias, spec) in &specs { let path = format!("/{alias}.json"); let body = serde_json::to_string(spec).unwrap(); let mock = server .mock("GET", path.as_str()) .with_status(200) .with_header("content-type", "application/json") .with_body(&body) .create_async() .await; mocks.push(mock); // Write initial cache let url = format!("{base_url}/{alias}.json"); write_test_cache(&cache_path, alias, spec, Some(url)); } let mock_host = extract_host(&base_url); let args = Args { alias: None, all: true, dry_run: false, force: true, // Force re-fetch to trigger network calls details: false, auth: None, jobs: 2, per_host: 2, max_failures: None, resume: false, allow_private_host: vec![mock_host], }; // Write a minimal config let config_dir = tmp.path().join("config"); std::fs::create_dir_all(&config_dir).unwrap(); let config_path = config_dir.join("config.toml"); std::fs::write(&config_path, b"").unwrap(); let result = sync_all_inner( &args, cache_path.clone(), true, NetworkPolicy::Auto, Some(&config_path), ) .await; assert!(result.is_ok()); // Verify all mocks were called for mock in &mocks { mock.assert_async().await; } // Verify checkpoint was cleaned up (no abort) assert!(!cache_path.join(CHECKPOINT_FILE).exists()); } #[tokio::test] async fn test_sync_all_failure_budget() { // Set up aliases where some will fail (404) to test --max-failures let mut server = mockito::Server::new_async().await; let base_url = server.url(); let tmp = tempfile::tempdir().unwrap(); let cache_path = tmp.path().to_path_buf(); let spec = serde_json::json!({ "openapi": "3.0.3", "info": { "title": "Test", "version": "1.0.0" }, "paths": { "/x": { "get": { "summary": "X", "responses": { "200": { "description": "OK" }}}}} }); // Create 4 aliases, all pointing to endpoints that 404 for alias in &["fail1", "fail2", "fail3", "fail4"] { let url = format!("{base_url}/{alias}.json"); write_test_cache(&cache_path, alias, &spec, Some(url)); let path = format!("/{alias}.json"); server .mock("GET", path.as_str()) .with_status(404) .create_async() .await; } let mock_host = extract_host(&base_url); let args = Args { alias: None, all: true, dry_run: false, force: true, details: false, auth: None, jobs: 1, // Sequential to make failure budget deterministic per_host: 2, max_failures: Some(2), resume: false, allow_private_host: vec![mock_host], }; let config_dir = tmp.path().join("config"); std::fs::create_dir_all(&config_dir).unwrap(); let config_path_file = config_dir.join("config.toml"); std::fs::write(&config_path_file, b"").unwrap(); let result = sync_all_inner( &args, cache_path.clone(), true, NetworkPolicy::Auto, Some(&config_path_file), ) .await; assert!(result.is_ok()); // Checkpoint should still exist since we aborted assert!(cache_path.join(CHECKPOINT_FILE).exists()); } #[tokio::test] async fn test_sync_all_skips_aliases_without_urls() { let tmp = tempfile::tempdir().unwrap(); let cache_path = tmp.path().to_path_buf(); let spec = serde_json::json!({ "openapi": "3.0.3", "info": { "title": "Local", "version": "1.0.0" }, "paths": {} }); // Create alias with no URL (fetched from file) write_test_cache(&cache_path, "local-only", &spec, None); let args = Args { alias: None, all: true, dry_run: false, force: false, details: false, auth: None, jobs: 4, per_host: 2, max_failures: None, resume: false, allow_private_host: vec![], }; let result = sync_all_inner(&args, cache_path, true, NetworkPolicy::Auto, None).await; // Should succeed with 0 aliases to sync (local-only has no URL) assert!(result.is_ok()); } #[tokio::test] async fn test_sync_all_resume_skips_completed() { let tmp = tempfile::tempdir().unwrap(); let cache_path = tmp.path().to_path_buf(); let mut server = mockito::Server::new_async().await; let base_url = server.url(); let spec = serde_json::json!({ "openapi": "3.0.3", "info": { "title": "Test", "version": "1.0.0" }, "paths": { "/x": { "get": { "summary": "X", "responses": { "200": { "description": "OK" }}}}} }); // Create 3 aliases for alias in &["r1", "r2", "r3"] { let url = format!("{base_url}/{alias}.json"); write_test_cache(&cache_path, alias, &spec, Some(url)); } // Write a checkpoint saying r1 is already completed let checkpoint = SyncCheckpoint { started_at: "2025-01-01T00:00:00Z".to_string(), aliases_completed: vec!["r1".into()], aliases_failed: vec![], }; save_checkpoint(&cache_path, &checkpoint).unwrap(); // Only r2 and r3 should be fetched let _mock_r2 = server .mock("GET", "/r2.json") .with_status(200) .with_header("content-type", "application/json") .with_body(serde_json::to_string(&spec).unwrap()) .create_async() .await; let _mock_r3 = server .mock("GET", "/r3.json") .with_status(200) .with_header("content-type", "application/json") .with_body(serde_json::to_string(&spec).unwrap()) .create_async() .await; let mock_host = extract_host(&base_url); let args = Args { alias: None, all: true, dry_run: false, force: true, details: false, auth: None, jobs: 4, per_host: 2, max_failures: None, resume: true, allow_private_host: vec![mock_host], }; let config_dir = tmp.path().join("config"); std::fs::create_dir_all(&config_dir).unwrap(); let config_path_file = config_dir.join("config.toml"); std::fs::write(&config_path_file, b"").unwrap(); let result = sync_all_inner( &args, cache_path.clone(), true, NetworkPolicy::Auto, Some(&config_path_file), ) .await; assert!(result.is_ok()); // r2 and r3 should have been fetched _mock_r2.assert_async().await; _mock_r3.assert_async().await; // Checkpoint should be cleaned up assert!(!cache_path.join(CHECKPOINT_FILE).exists()); } }