diff --git a/internal/pipeline/aggregator.go b/internal/pipeline/aggregator.go new file mode 100644 index 0000000..906d6a1 --- /dev/null +++ b/internal/pipeline/aggregator.go @@ -0,0 +1,261 @@ +package pipeline + +import ( + "sort" + "strings" + "time" + + "cburn/internal/config" + "cburn/internal/model" +) + +// Aggregate computes summary statistics from a slice of session stats, +// filtered to sessions within the given time range. +func Aggregate(sessions []model.SessionStats, since, until time.Time) model.SummaryStats { + filtered := FilterByTime(sessions, since, until) + + var stats model.SummaryStats + activeDays := make(map[string]struct{}) + + for _, s := range filtered { + stats.TotalSessions++ + stats.TotalPrompts += s.UserMessages + stats.TotalAPICalls += s.APICalls + stats.TotalDurationSecs += s.DurationSecs + + stats.InputTokens += s.InputTokens + stats.OutputTokens += s.OutputTokens + stats.CacheCreation5mTokens += s.CacheCreation5mTokens + stats.CacheCreation1hTokens += s.CacheCreation1hTokens + stats.CacheReadTokens += s.CacheReadTokens + stats.EstimatedCost += s.EstimatedCost + + if !s.StartTime.IsZero() { + day := s.StartTime.Local().Format("2006-01-02") + activeDays[day] = struct{}{} + } + } + + stats.ActiveDays = len(activeDays) + stats.TotalBilledTokens = stats.InputTokens + stats.OutputTokens + + stats.CacheCreation5mTokens + stats.CacheCreation1hTokens + + // Cache hit rate + totalCacheInput := stats.CacheReadTokens + stats.CacheCreation5mTokens + + stats.CacheCreation1hTokens + stats.InputTokens + if totalCacheInput > 0 { + stats.CacheHitRate = float64(stats.CacheReadTokens) / float64(totalCacheInput) + } + + // Cache savings (sum across all models found in sessions) + for _, s := range filtered { + for modelName, mu := range s.Models { + stats.CacheSavings += config.CalculateCacheSavings(modelName, mu.CacheReadTokens) + } + } + + // Per-active-day rates + if stats.ActiveDays > 0 { + days := float64(stats.ActiveDays) + stats.CostPerDay = stats.EstimatedCost / days + stats.TokensPerDay = int64(float64(stats.TotalBilledTokens) / days) + stats.SessionsPerDay = float64(stats.TotalSessions) / days + stats.PromptsPerDay = float64(stats.TotalPrompts) / days + stats.MinutesPerDay = float64(stats.TotalDurationSecs) / 60 / days + } + + return stats +} + +// AggregateDays computes per-day statistics from sessions. +func AggregateDays(sessions []model.SessionStats, since, until time.Time) []model.DailyStats { + filtered := FilterByTime(sessions, since, until) + + dayMap := make(map[string]*model.DailyStats) + + for _, s := range filtered { + if s.StartTime.IsZero() { + continue + } + dayKey := s.StartTime.Local().Format("2006-01-02") + ds, ok := dayMap[dayKey] + if !ok { + t, _ := time.ParseInLocation("2006-01-02", dayKey, time.Local) + ds = &model.DailyStats{Date: t} + dayMap[dayKey] = ds + } + + ds.Sessions++ + ds.Prompts += s.UserMessages + ds.APICalls += s.APICalls + ds.DurationSecs += s.DurationSecs + ds.InputTokens += s.InputTokens + ds.OutputTokens += s.OutputTokens + ds.CacheCreation5m += s.CacheCreation5mTokens + ds.CacheCreation1h += s.CacheCreation1hTokens + ds.CacheReadTokens += s.CacheReadTokens + ds.EstimatedCost += s.EstimatedCost + } + + // Convert to sorted slice (most recent first) + days := make([]model.DailyStats, 0, len(dayMap)) + for _, ds := range dayMap { + days = append(days, *ds) + } + sort.Slice(days, func(i, j int) bool { + return days[i].Date.After(days[j].Date) + }) + + return days +} + +// AggregateModels computes per-model statistics from sessions. +func AggregateModels(sessions []model.SessionStats, since, until time.Time) []model.ModelStats { + filtered := FilterByTime(sessions, since, until) + + modelMap := make(map[string]*model.ModelStats) + totalCalls := 0 + + for _, s := range filtered { + for modelName, mu := range s.Models { + ms, ok := modelMap[modelName] + if !ok { + ms = &model.ModelStats{Model: modelName} + modelMap[modelName] = ms + } + ms.APICalls += mu.APICalls + ms.InputTokens += mu.InputTokens + ms.OutputTokens += mu.OutputTokens + ms.CacheCreation5m += mu.CacheCreation5mTokens + ms.CacheCreation1h += mu.CacheCreation1hTokens + ms.CacheReadTokens += mu.CacheReadTokens + ms.EstimatedCost += mu.EstimatedCost + totalCalls += mu.APICalls + } + } + + // Compute share percentages and sort by cost descending + models := make([]model.ModelStats, 0, len(modelMap)) + for _, ms := range modelMap { + if totalCalls > 0 { + ms.SharePercent = float64(ms.APICalls) / float64(totalCalls) * 100 + } + models = append(models, *ms) + } + sort.Slice(models, func(i, j int) bool { + return models[i].EstimatedCost > models[j].EstimatedCost + }) + + return models +} + +// AggregateProjects computes per-project statistics from sessions. +func AggregateProjects(sessions []model.SessionStats, since, until time.Time) []model.ProjectStats { + filtered := FilterByTime(sessions, since, until) + + projMap := make(map[string]*model.ProjectStats) + + for _, s := range filtered { + ps, ok := projMap[s.Project] + if !ok { + ps = &model.ProjectStats{Project: s.Project} + projMap[s.Project] = ps + } + ps.Sessions++ + ps.Prompts += s.UserMessages + ps.TotalTokens += s.InputTokens + s.OutputTokens + + s.CacheCreation5mTokens + s.CacheCreation1hTokens + ps.EstimatedCost += s.EstimatedCost + } + + // Sort by cost descending + projects := make([]model.ProjectStats, 0, len(projMap)) + for _, ps := range projMap { + projects = append(projects, *ps) + } + sort.Slice(projects, func(i, j int) bool { + return projects[i].EstimatedCost > projects[j].EstimatedCost + }) + + return projects +} + +// AggregateHourly computes prompt counts by hour of day. +func AggregateHourly(sessions []model.SessionStats, since, until time.Time) []model.HourlyStats { + filtered := FilterByTime(sessions, since, until) + + hours := make([]model.HourlyStats, 24) + for i := range hours { + hours[i].Hour = i + } + + // We attribute all prompts and tokens to the session's start hour + for _, s := range filtered { + if s.StartTime.IsZero() { + continue + } + h := s.StartTime.Local().Hour() + hours[h].Prompts += s.UserMessages + hours[h].Sessions++ + hours[h].Tokens += s.InputTokens + s.OutputTokens + } + + return hours +} + +// FilterByTime returns sessions whose start time falls within [since, until). +func FilterByTime(sessions []model.SessionStats, since, until time.Time) []model.SessionStats { + if since.IsZero() && until.IsZero() { + return sessions + } + + var result []model.SessionStats + for _, s := range sessions { + if s.StartTime.IsZero() { + continue + } + if !since.IsZero() && s.StartTime.Before(since) { + continue + } + if !until.IsZero() && !s.StartTime.Before(until) { + continue + } + result = append(result, s) + } + return result +} + +// FilterByProject returns sessions matching the project substring. +func FilterByProject(sessions []model.SessionStats, project string) []model.SessionStats { + if project == "" { + return sessions + } + var result []model.SessionStats + for _, s := range sessions { + if containsIgnoreCase(s.Project, project) { + result = append(result, s) + } + } + return result +} + +// FilterByModel returns sessions that have at least one API call to the given model. +func FilterByModel(sessions []model.SessionStats, modelFilter string) []model.SessionStats { + if modelFilter == "" { + return sessions + } + var result []model.SessionStats + for _, s := range sessions { + for m := range s.Models { + if containsIgnoreCase(m, modelFilter) { + result = append(result, s) + break + } + } + } + return result +} + +func containsIgnoreCase(s, substr string) bool { + return strings.Contains(strings.ToLower(s), strings.ToLower(substr)) +} diff --git a/internal/pipeline/bench_test.go b/internal/pipeline/bench_test.go new file mode 100644 index 0000000..5bcd311 --- /dev/null +++ b/internal/pipeline/bench_test.go @@ -0,0 +1,92 @@ +package pipeline + +import ( + "os" + "path/filepath" + "testing" + + "cburn/internal/source" + "cburn/internal/store" +) + +func BenchmarkLoad(b *testing.B) { + homeDir, _ := os.UserHomeDir() + claudeDir := filepath.Join(homeDir, ".claude") + + b.ResetTimer() + for i := 0; i < b.N; i++ { + result, err := Load(claudeDir, true, nil) + if err != nil { + b.Fatal(err) + } + _ = result + } +} + +func BenchmarkParseFile(b *testing.B) { + homeDir, _ := os.UserHomeDir() + claudeDir := filepath.Join(homeDir, ".claude") + + files, err := source.ScanDir(claudeDir) + if err != nil { + b.Fatal(err) + } + + // Find the largest file for worst-case benchmarking + var biggest source.DiscoveredFile + var biggestSize int64 + for _, f := range files { + info, err := os.Stat(f.Path) + if err != nil { + continue + } + if info.Size() > biggestSize { + biggestSize = info.Size() + biggest = f + } + } + + b.Logf("Benchmarking largest file: %s (%.1f KB)", biggest.Path, float64(biggestSize)/1024) + b.ResetTimer() + + for i := 0; i < b.N; i++ { + result := source.ParseFile(biggest) + if result.Err != nil { + b.Fatal(result.Err) + } + } +} + +func BenchmarkScanDir(b *testing.B) { + homeDir, _ := os.UserHomeDir() + claudeDir := filepath.Join(homeDir, ".claude") + + b.ResetTimer() + for i := 0; i < b.N; i++ { + files, err := source.ScanDir(claudeDir) + if err != nil { + b.Fatal(err) + } + _ = files + } +} + +func BenchmarkLoadWithCache(b *testing.B) { + homeDir, _ := os.UserHomeDir() + claudeDir := filepath.Join(homeDir, ".claude") + + cache, err := store.Open(CachePath()) + if err != nil { + b.Fatal(err) + } + defer cache.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cr, err := LoadWithCache(claudeDir, true, cache, nil) + if err != nil { + b.Fatal(err) + } + _ = cr + } +} diff --git a/internal/pipeline/incremental.go b/internal/pipeline/incremental.go new file mode 100644 index 0000000..35fa7b2 --- /dev/null +++ b/internal/pipeline/incremental.go @@ -0,0 +1,177 @@ +package pipeline + +import ( + "fmt" + "os" + "path/filepath" + "runtime" + "sync" + "sync/atomic" + + "cburn/internal/source" + "cburn/internal/store" +) + +// CachedLoadResult extends LoadResult with cache metadata. +type CachedLoadResult struct { + LoadResult + CacheHits int + Reparsed int +} + +// LoadWithCache discovers, diffs against cache, parses only changed files, +// and returns the combined result set. +func LoadWithCache(claudeDir string, includeSubagents bool, cache *store.Cache, progressFn ProgressFunc) (*CachedLoadResult, error) { + // Discover files + files, err := source.ScanDir(claudeDir) + if err != nil { + return nil, fmt.Errorf("scanning %s: %w", claudeDir, err) + } + + if len(files) == 0 { + return &CachedLoadResult{}, nil + } + + // Filter subagents if requested + var toProcess []source.DiscoveredFile + if includeSubagents { + toProcess = files + } else { + for _, f := range files { + if !f.IsSubagent { + toProcess = append(toProcess, f) + } + } + } + + result := &CachedLoadResult{ + LoadResult: LoadResult{ + TotalFiles: len(toProcess), + ProjectCount: source.CountProjects(files), + }, + } + + if len(toProcess) == 0 { + return result, nil + } + + // Get tracked files from cache + tracked, err := cache.GetTrackedFiles() + if err != nil { + return nil, fmt.Errorf("reading cache: %w", err) + } + + // Diff: partition into changed and unchanged + var toReparse []source.DiscoveredFile + var unchanged []string // file paths that haven't changed + + for _, f := range toProcess { + info, err := os.Stat(f.Path) + if err != nil { + continue + } + + cached, ok := tracked[f.Path] + if ok && cached.MtimeNs == info.ModTime().UnixNano() && cached.SizeBytes == info.Size() { + unchanged = append(unchanged, f.Path) + } else { + toReparse = append(toReparse, f) + } + } + + result.CacheHits = len(unchanged) + result.Reparsed = len(toReparse) + + // Load cached sessions + if len(unchanged) > 0 { + cached, err := cache.LoadAllSessions() + if err != nil { + return nil, fmt.Errorf("loading cached sessions: %w", err) + } + + // Filter to only sessions from unchanged files + unchangedSet := make(map[string]struct{}, len(unchanged)) + for _, p := range unchanged { + unchangedSet[p] = struct{}{} + } + for _, s := range cached { + if _, ok := unchangedSet[s.FilePath]; ok { + result.Sessions = append(result.Sessions, s) + result.ParsedFiles++ + } + } + } + + // Parse changed files + if len(toReparse) > 0 { + numWorkers := runtime.GOMAXPROCS(0) + if numWorkers < 1 { + numWorkers = 4 + } + if numWorkers > len(toReparse) { + numWorkers = len(toReparse) + } + + work := make(chan int, len(toReparse)) + results := make([]source.ParseResult, len(toReparse)) + var wg sync.WaitGroup + var processed atomic.Int64 + + for i := range toReparse { + work <- i + } + close(work) + + wg.Add(numWorkers) + for w := 0; w < numWorkers; w++ { + go func() { + defer wg.Done() + for idx := range work { + results[idx] = source.ParseFile(toReparse[idx]) + n := processed.Add(1) + if progressFn != nil { + progressFn(int(n)+result.CacheHits, result.TotalFiles) + } + } + }() + } + + wg.Wait() + + // Collect and cache results + for i, pr := range results { + if pr.Err != nil { + result.FileErrors++ + continue + } + result.ParsedFiles++ + result.ParseErrors += pr.ParseErrors + + if pr.Stats.APICalls > 0 || pr.Stats.UserMessages > 0 { + result.Sessions = append(result.Sessions, pr.Stats) + + // Save to cache + info, err := os.Stat(toReparse[i].Path) + if err == nil { + _ = cache.SaveSession(pr.Stats, info.ModTime().UnixNano(), info.Size()) + } + } + } + } + + return result, nil +} + +// CacheDir returns the platform-appropriate cache directory. +func CacheDir() string { + if xdg := os.Getenv("XDG_CACHE_HOME"); xdg != "" { + return filepath.Join(xdg, "cburn") + } + home, _ := os.UserHomeDir() + return filepath.Join(home, ".cache", "cburn") +} + +// CachePath returns the full path to the cache database. +func CachePath() string { + return filepath.Join(CacheDir(), "metrics.db") +} diff --git a/internal/pipeline/loader.go b/internal/pipeline/loader.go new file mode 100644 index 0000000..e106d93 --- /dev/null +++ b/internal/pipeline/loader.go @@ -0,0 +1,117 @@ +package pipeline + +import ( + "fmt" + "runtime" + "sync" + "sync/atomic" + + "cburn/internal/model" + "cburn/internal/source" +) + +// LoadResult holds the output of the full data loading pipeline. +type LoadResult struct { + Sessions []model.SessionStats + TotalFiles int + ParsedFiles int + ParseErrors int + FileErrors int + ProjectCount int +} + +// ProgressFunc is called during loading to report progress. +// current is the number of files processed so far, total is the total count. +type ProgressFunc func(current, total int) + +// Load discovers and parses all session files from the Claude data directory. +// It uses a bounded worker pool for parallel parsing. +func Load(claudeDir string, includeSubagents bool, progressFn ProgressFunc) (*LoadResult, error) { + // Discover files + files, err := source.ScanDir(claudeDir) + if err != nil { + return nil, fmt.Errorf("scanning %s: %w", claudeDir, err) + } + + if len(files) == 0 { + return &LoadResult{}, nil + } + + // Filter subagents if requested + var toProcess []source.DiscoveredFile + if includeSubagents { + toProcess = files + } else { + for _, f := range files { + if !f.IsSubagent { + toProcess = append(toProcess, f) + } + } + } + + result := &LoadResult{ + TotalFiles: len(toProcess), + ProjectCount: source.CountProjects(files), + } + + if len(toProcess) == 0 { + return result, nil + } + + // Parallel parsing with bounded worker pool + numWorkers := runtime.GOMAXPROCS(0) + if numWorkers < 1 { + numWorkers = 4 + } + if numWorkers > len(toProcess) { + numWorkers = len(toProcess) + } + + type indexedResult struct { + idx int + result source.ParseResult + } + + work := make(chan int, len(toProcess)) + results := make([]source.ParseResult, len(toProcess)) + var wg sync.WaitGroup + var processed atomic.Int64 + + // Feed work + for i := range toProcess { + work <- i + } + close(work) + + // Spawn workers + wg.Add(numWorkers) + for w := 0; w < numWorkers; w++ { + go func() { + defer wg.Done() + for idx := range work { + results[idx] = source.ParseFile(toProcess[idx]) + n := processed.Add(1) + if progressFn != nil { + progressFn(int(n), len(toProcess)) + } + } + }() + } + + wg.Wait() + + // Collect results + for _, pr := range results { + if pr.Err != nil { + result.FileErrors++ + continue + } + result.ParsedFiles++ + result.ParseErrors += pr.ParseErrors + if pr.Stats.APICalls > 0 || pr.Stats.UserMessages > 0 { + result.Sessions = append(result.Sessions, pr.Stats) + } + } + + return result, nil +}