From 8c1beb7a8a13498991c8ba86a1b8b0b3ec0c6c5e Mon Sep 17 00:00:00 2001 From: teernisse Date: Sat, 28 Feb 2026 00:04:55 -0500 Subject: [PATCH] feat(daemon): add background usage monitor with HTTP/SSE API Implement a long-running daemon service that continuously polls Claude Code session logs and exposes usage data via local HTTP endpoints. Architecture: - internal/daemon/service.go: Core Service struct managing poll loop, snapshot computation, event buffering, and HTTP handlers - cmd/daemon.go: Cobra commands for start/status/stop with detach mode HTTP Endpoints (default 127.0.0.1:8787): - GET /healthz - Liveness probe for orchestration - GET /v1/status - Current aggregate snapshot + daemon runtime info - GET /v1/events - Recent event buffer as JSON array - GET /v1/stream - Server-Sent Events for real-time updates Snapshot model captures: - Session/prompt/API call counts - Token totals and estimated cost - Cache hit rate - Rolling daily averages (cost/day, tokens/day, sessions/day) Delta detection emits events only when usage actually changes, keeping the event stream lean for downstream consumers. Detach mode (-d, --detach): - Forks a child process with stdout/stderr redirected to log file - Writes PID file for process management - Parent exits after confirming child is running This daemon serves as the foundation for planned capabilities like incident replay, runaway detection, and session classification. Co-Authored-By: Claude Opus 4.5 --- cmd/daemon.go | 344 +++++++++++++++++++++++++++ internal/daemon/service.go | 398 ++++++++++++++++++++++++++++++++ internal/daemon/service_test.go | 66 ++++++ 3 files changed, 808 insertions(+) create mode 100644 cmd/daemon.go create mode 100644 internal/daemon/service.go create mode 100644 internal/daemon/service_test.go diff --git a/cmd/daemon.go b/cmd/daemon.go new file mode 100644 index 0000000..b819967 --- /dev/null +++ b/cmd/daemon.go @@ -0,0 +1,344 @@ +package cmd + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" + + "github.com/theirongolddev/cburn/internal/daemon" + "github.com/theirongolddev/cburn/internal/pipeline" + + "github.com/spf13/cobra" +) + +type daemonRuntimeState struct { + PID int `json:"pid"` + Addr string `json:"addr"` + StartedAt time.Time `json:"started_at"` + DataDir string `json:"data_dir"` +} + +var ( + flagDaemonAddr string + flagDaemonInterval time.Duration + flagDaemonDetach bool + flagDaemonPIDFile string + flagDaemonLogFile string + flagDaemonEventsBuffer int + flagDaemonChild bool +) + +var daemonCmd = &cobra.Command{ + Use: "daemon", + Short: "Run a background usage daemon with HTTP/SSE endpoints", + RunE: runDaemon, +} + +var daemonStatusCmd = &cobra.Command{ + Use: "status", + Short: "Show daemon process and API status", + RunE: runDaemonStatus, +} + +var daemonStopCmd = &cobra.Command{ + Use: "stop", + Short: "Stop the running daemon", + RunE: runDaemonStop, +} + +func init() { + defaultPID := filepath.Join(pipeline.CacheDir(), "cburnd.pid") + defaultLog := filepath.Join(pipeline.CacheDir(), "cburnd.log") + + daemonCmd.PersistentFlags().StringVar(&flagDaemonAddr, "addr", "127.0.0.1:8787", "HTTP listen address") + daemonCmd.PersistentFlags().DurationVar(&flagDaemonInterval, "interval", 15*time.Second, "Polling interval") + daemonCmd.PersistentFlags().StringVar(&flagDaemonPIDFile, "pid-file", defaultPID, "PID file path") + daemonCmd.PersistentFlags().StringVar(&flagDaemonLogFile, "log-file", defaultLog, "Log file path for detached mode") + daemonCmd.PersistentFlags().IntVar(&flagDaemonEventsBuffer, "events-buffer", 200, "Max in-memory events retained") + + daemonCmd.Flags().BoolVar(&flagDaemonDetach, "detach", false, "Run daemon as a background process") + daemonCmd.Flags().BoolVar(&flagDaemonChild, "child", false, "Internal: mark detached child process") + _ = daemonCmd.Flags().MarkHidden("child") + + daemonCmd.AddCommand(daemonStatusCmd) + daemonCmd.AddCommand(daemonStopCmd) + rootCmd.AddCommand(daemonCmd) +} + +func runDaemon(_ *cobra.Command, _ []string) error { + if flagDaemonDetach && flagDaemonChild { + return errors.New("invalid daemon launch mode") + } + + if flagDaemonDetach { + return startDaemonDetached() + } + + return runDaemonForeground() +} + +func startDaemonDetached() error { + if err := ensureDaemonNotRunning(flagDaemonPIDFile); err != nil { + return err + } + + exe, err := os.Executable() + if err != nil { + return fmt.Errorf("resolve executable: %w", err) + } + + args := filterDetachArg(os.Args[1:]) + args = append(args, "--child") + + if err := os.MkdirAll(filepath.Dir(flagDaemonPIDFile), 0o750); err != nil { + return fmt.Errorf("create daemon directory: %w", err) + } + if err := os.MkdirAll(filepath.Dir(flagDaemonLogFile), 0o750); err != nil { + return fmt.Errorf("create daemon log directory: %w", err) + } + + //nolint:gosec // daemon log path is configured by the local user + logf, err := os.OpenFile(flagDaemonLogFile, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o600) + if err != nil { + return fmt.Errorf("open daemon log file: %w", err) + } + defer func() { _ = logf.Close() }() + + cmd := exec.Command(exe, args...) //nolint:gosec // exe/args come from current process invocation + cmd.Stdout = logf + cmd.Stderr = logf + cmd.Stdin = nil + cmd.Env = os.Environ() + + if err := cmd.Start(); err != nil { + return fmt.Errorf("start detached daemon: %w", err) + } + + fmt.Printf(" Started daemon (pid %d)\n", cmd.Process.Pid) + fmt.Printf(" PID file: %s\n", flagDaemonPIDFile) + fmt.Printf(" API: http://%s/v1/status\n", flagDaemonAddr) + fmt.Printf(" Log: %s\n", flagDaemonLogFile) + return nil +} + +func runDaemonForeground() error { + if err := ensureDaemonNotRunning(flagDaemonPIDFile); err != nil { + return err + } + + if err := os.MkdirAll(filepath.Dir(flagDaemonPIDFile), 0o750); err != nil { + return fmt.Errorf("create daemon directory: %w", err) + } + + pid := os.Getpid() + if err := writePID(flagDaemonPIDFile, pid); err != nil { + return err + } + defer func() { _ = os.Remove(flagDaemonPIDFile) }() + + state := daemonRuntimeState{ + PID: pid, + Addr: flagDaemonAddr, + StartedAt: time.Now(), + DataDir: flagDataDir, + } + _ = writeState(statePath(flagDaemonPIDFile), state) + defer func() { _ = os.Remove(statePath(flagDaemonPIDFile)) }() + + cfg := daemon.Config{ + DataDir: flagDataDir, + Days: flagDays, + ProjectFilter: flagProject, + ModelFilter: flagModel, + IncludeSubagents: !flagNoSubagents, + UseCache: !flagNoCache, + Interval: flagDaemonInterval, + Addr: flagDaemonAddr, + EventsBuffer: flagDaemonEventsBuffer, + } + svc := daemon.New(cfg) + + fmt.Printf(" cburn daemon listening on http://%s\n", flagDaemonAddr) + fmt.Printf(" Polling every %s from %s\n", flagDaemonInterval, flagDataDir) + fmt.Printf(" Stop with: cburn daemon stop --pid-file %s\n", flagDaemonPIDFile) + + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + + if err := svc.Run(ctx); err != nil && !errors.Is(err, context.Canceled) { + return err + } + return nil +} + +func runDaemonStatus(_ *cobra.Command, _ []string) error { + pid, err := readPID(flagDaemonPIDFile) + if err != nil { + fmt.Printf(" Daemon: not running (pid file not found)\n") + return nil + } + + alive := processAlive(pid) + if !alive { + fmt.Printf(" Daemon: stale pid file (pid %d not alive)\n", pid) + return nil + } + + addr := flagDaemonAddr + if st, err := readState(statePath(flagDaemonPIDFile)); err == nil && st.Addr != "" { + addr = st.Addr + } + + fmt.Printf(" Daemon PID: %d\n", pid) + fmt.Printf(" Address: http://%s\n", addr) + + client := &http.Client{Timeout: 2 * time.Second} + resp, err := client.Get("http://" + addr + "/v1/status") //nolint:noctx // short status probe + if err != nil { + fmt.Printf(" API status: unreachable (%v)\n", err) + return nil + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != http.StatusOK { + fmt.Printf(" API status: HTTP %d\n", resp.StatusCode) + return nil + } + + var st daemon.Status + if err := json.NewDecoder(resp.Body).Decode(&st); err != nil { + fmt.Printf(" API status: malformed response (%v)\n", err) + return nil + } + + if st.LastPollAt.IsZero() { + fmt.Printf(" Last poll: pending\n") + } else { + fmt.Printf(" Last poll: %s\n", st.LastPollAt.Local().Format(time.RFC3339)) + } + fmt.Printf(" Poll count: %d\n", st.PollCount) + fmt.Printf(" Sessions: %d\n", st.Summary.Sessions) + fmt.Printf(" Tokens: %d\n", st.Summary.Tokens) + fmt.Printf(" Cost: $%.2f\n", st.Summary.EstimatedCostUSD) + if st.LastError != "" { + fmt.Printf(" Last error: %s\n", st.LastError) + } + return nil +} + +func runDaemonStop(_ *cobra.Command, _ []string) error { + pid, err := readPID(flagDaemonPIDFile) + if err != nil { + return errors.New("daemon is not running") + } + + proc, err := os.FindProcess(pid) + if err != nil { + return fmt.Errorf("find daemon process: %w", err) + } + if err := proc.Signal(syscall.SIGTERM); err != nil { + return fmt.Errorf("signal daemon process: %w", err) + } + + deadline := time.Now().Add(8 * time.Second) + for time.Now().Before(deadline) { + if !processAlive(pid) { + _ = os.Remove(flagDaemonPIDFile) + _ = os.Remove(statePath(flagDaemonPIDFile)) + fmt.Printf(" Stopped daemon (pid %d)\n", pid) + return nil + } + time.Sleep(150 * time.Millisecond) + } + + return fmt.Errorf("daemon (pid %d) did not exit in time", pid) +} + +func filterDetachArg(args []string) []string { + out := make([]string, 0, len(args)) + for _, a := range args { + if a == "--detach" || strings.HasPrefix(a, "--detach=") { + continue + } + out = append(out, a) + } + return out +} + +func ensureDaemonNotRunning(pidFile string) error { + pid, err := readPID(pidFile) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err + } + if processAlive(pid) { + return fmt.Errorf("daemon already running (pid %d)", pid) + } + _ = os.Remove(pidFile) + _ = os.Remove(statePath(pidFile)) + return nil +} + +func writePID(path string, pid int) error { + return os.WriteFile(path, []byte(strconv.Itoa(pid)+"\n"), 0o600) +} + +func readPID(path string) (int, error) { + //nolint:gosec // daemon pid path is configured by the local user + data, err := os.ReadFile(path) + if err != nil { + return 0, err + } + pidStr := strings.TrimSpace(string(data)) + pid, err := strconv.Atoi(pidStr) + if err != nil || pid <= 0 { + return 0, fmt.Errorf("invalid pid in %s", path) + } + return pid, nil +} + +func processAlive(pid int) bool { + proc, err := os.FindProcess(pid) + if err != nil { + return false + } + err = proc.Signal(syscall.Signal(0)) + return err == nil || errors.Is(err, syscall.EPERM) +} + +func statePath(pidFile string) string { + return pidFile + ".json" +} + +func writeState(path string, st daemonRuntimeState) error { + data, err := json.MarshalIndent(st, "", " ") + if err != nil { + return err + } + return os.WriteFile(path, append(data, '\n'), 0o600) +} + +func readState(path string) (daemonRuntimeState, error) { + var st daemonRuntimeState + //nolint:gosec // daemon state path is configured by the local user + data, err := os.ReadFile(path) + if err != nil { + return st, err + } + if err := json.Unmarshal(data, &st); err != nil { + return st, err + } + return st, nil +} diff --git a/internal/daemon/service.go b/internal/daemon/service.go new file mode 100644 index 0000000..3c030ae --- /dev/null +++ b/internal/daemon/service.go @@ -0,0 +1,398 @@ +// Package daemon provides the long-running background usage monitor service. +package daemon + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "sync" + "time" + + "github.com/theirongolddev/cburn/internal/model" + "github.com/theirongolddev/cburn/internal/pipeline" + "github.com/theirongolddev/cburn/internal/store" +) + +// Config controls the daemon runtime behavior. +type Config struct { + DataDir string + Days int + ProjectFilter string + ModelFilter string + IncludeSubagents bool + UseCache bool + Interval time.Duration + Addr string + EventsBuffer int +} + +// Snapshot is a compact usage state for status/event payloads. +type Snapshot struct { + At time.Time `json:"at"` + Sessions int `json:"sessions"` + Prompts int `json:"prompts"` + APICalls int `json:"api_calls"` + Tokens int64 `json:"tokens"` + EstimatedCostUSD float64 `json:"estimated_cost_usd"` + CacheHitRate float64 `json:"cache_hit_rate"` + CostPerDayUSD float64 `json:"cost_per_day_usd"` + TokensPerDay int64 `json:"tokens_per_day"` + SessionsPerDay float64 `json:"sessions_per_day"` +} + +// Delta captures snapshot deltas between polls. +type Delta struct { + Sessions int `json:"sessions"` + Prompts int `json:"prompts"` + APICalls int `json:"api_calls"` + Tokens int64 `json:"tokens"` + EstimatedCostUSD float64 `json:"estimated_cost_usd"` +} + +func (d Delta) isZero() bool { + return d.Sessions == 0 && + d.Prompts == 0 && + d.APICalls == 0 && + d.Tokens == 0 && + d.EstimatedCostUSD == 0 +} + +// Event is emitted whenever usage snapshot updates. +type Event struct { + ID int64 `json:"id"` + Type string `json:"type"` + Timestamp time.Time `json:"timestamp"` + Snapshot Snapshot `json:"snapshot"` + Delta Delta `json:"delta"` +} + +// Status is served at /v1/status. +type Status struct { + StartedAt time.Time `json:"started_at"` + LastPollAt time.Time `json:"last_poll_at"` + PollIntervalSec int `json:"poll_interval_sec"` + PollCount int64 `json:"poll_count"` + DataDir string `json:"data_dir"` + Days int `json:"days"` + ProjectFilter string `json:"project_filter,omitempty"` + ModelFilter string `json:"model_filter,omitempty"` + Summary Snapshot `json:"summary"` + LastError string `json:"last_error,omitempty"` + EventCount int `json:"event_count"` + SubscriberCount int `json:"subscriber_count"` +} + +// Service provides the daemon runtime and HTTP API. +type Service struct { + cfg Config + + mu sync.RWMutex + startedAt time.Time + lastPollAt time.Time + pollCount int64 + lastError string + hasSnapshot bool + snapshot Snapshot + nextEventID int64 + events []Event + + nextSubID int + subs map[int]chan Event +} + +// New returns a new daemon service with the provided config. +func New(cfg Config) *Service { + if cfg.Interval < 2*time.Second { + cfg.Interval = 10 * time.Second + } + if cfg.EventsBuffer < 1 { + cfg.EventsBuffer = 200 + } + if cfg.Addr == "" { + cfg.Addr = "127.0.0.1:8787" + } + + return &Service{ + cfg: cfg, + startedAt: time.Now(), + subs: make(map[int]chan Event), + } +} + +// Run starts HTTP endpoints and polling until ctx is canceled. +func (s *Service) Run(ctx context.Context) error { + mux := http.NewServeMux() + mux.HandleFunc("/healthz", s.handleHealth) + mux.HandleFunc("/v1/status", s.handleStatus) + mux.HandleFunc("/v1/events", s.handleEvents) + mux.HandleFunc("/v1/stream", s.handleStream) + + server := &http.Server{ + Addr: s.cfg.Addr, + Handler: mux, + ReadHeaderTimeout: 5 * time.Second, + } + + errCh := make(chan error, 1) + go func() { + if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + errCh <- err + } + }() + + // Seed initial snapshot so status is useful immediately. + s.pollOnce() + + ticker := time.NewTicker(s.cfg.Interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + return server.Shutdown(shutdownCtx) + case <-ticker.C: + s.pollOnce() + case err := <-errCh: + return fmt.Errorf("daemon http server: %w", err) + } + } +} + +func (s *Service) pollOnce() { + start := time.Now() + sessions, err := s.loadSessions() + if err != nil { + s.mu.Lock() + s.lastError = err.Error() + s.lastPollAt = time.Now() + s.pollCount++ + s.mu.Unlock() + log.Printf("cburn daemon poll error: %v", err) + return + } + + now := time.Now() + since := now.AddDate(0, 0, -s.cfg.Days) + + filtered := sessions + if s.cfg.ProjectFilter != "" { + filtered = pipeline.FilterByProject(filtered, s.cfg.ProjectFilter) + } + if s.cfg.ModelFilter != "" { + filtered = pipeline.FilterByModel(filtered, s.cfg.ModelFilter) + } + + stats := pipeline.Aggregate(filtered, since, now) + snap := snapshotFromSummary(stats, now) + + var ( + ev Event + publish bool + ) + + s.mu.Lock() + prev := s.snapshot + prevExists := s.hasSnapshot + + s.hasSnapshot = true + s.snapshot = snap + s.lastPollAt = now + s.pollCount++ + s.lastError = "" + + if !prevExists { + s.nextEventID++ + ev = Event{ + ID: s.nextEventID, + Type: "snapshot", + Timestamp: now, + Snapshot: snap, + Delta: Delta{}, + } + publish = true + } else { + delta := diffSnapshots(prev, snap) + if !delta.isZero() { + s.nextEventID++ + ev = Event{ + ID: s.nextEventID, + Type: "usage_delta", + Timestamp: now, + Snapshot: snap, + Delta: delta, + } + publish = true + } + } + s.mu.Unlock() + + if publish { + s.publishEvent(ev) + } + + _ = start +} + +func (s *Service) loadSessions() ([]model.SessionStats, error) { + if s.cfg.UseCache { + cache, err := store.Open(pipeline.CachePath()) + if err == nil { + defer func() { _ = cache.Close() }() + cr, loadErr := pipeline.LoadWithCache(s.cfg.DataDir, s.cfg.IncludeSubagents, cache, nil) + if loadErr == nil { + return cr.Sessions, nil + } + } + } + + result, err := pipeline.Load(s.cfg.DataDir, s.cfg.IncludeSubagents, nil) + if err != nil { + return nil, err + } + return result.Sessions, nil +} + +func snapshotFromSummary(stats model.SummaryStats, at time.Time) Snapshot { + return Snapshot{ + At: at, + Sessions: stats.TotalSessions, + Prompts: stats.TotalPrompts, + APICalls: stats.TotalAPICalls, + Tokens: stats.TotalBilledTokens, + EstimatedCostUSD: stats.EstimatedCost, + CacheHitRate: stats.CacheHitRate, + CostPerDayUSD: stats.CostPerDay, + TokensPerDay: stats.TokensPerDay, + SessionsPerDay: stats.SessionsPerDay, + } +} + +func diffSnapshots(prev, curr Snapshot) Delta { + return Delta{ + Sessions: curr.Sessions - prev.Sessions, + Prompts: curr.Prompts - prev.Prompts, + APICalls: curr.APICalls - prev.APICalls, + Tokens: curr.Tokens - prev.Tokens, + EstimatedCostUSD: curr.EstimatedCostUSD - prev.EstimatedCostUSD, + } +} + +func (s *Service) publishEvent(ev Event) { + s.mu.Lock() + s.events = append(s.events, ev) + if len(s.events) > s.cfg.EventsBuffer { + s.events = s.events[len(s.events)-s.cfg.EventsBuffer:] + } + + for _, ch := range s.subs { + select { + case ch <- ev: + default: + } + } + s.mu.Unlock() +} + +func (s *Service) snapshotStatus() Status { + s.mu.RLock() + defer s.mu.RUnlock() + + return Status{ + StartedAt: s.startedAt, + LastPollAt: s.lastPollAt, + PollIntervalSec: int(s.cfg.Interval.Seconds()), + PollCount: s.pollCount, + DataDir: s.cfg.DataDir, + Days: s.cfg.Days, + ProjectFilter: s.cfg.ProjectFilter, + ModelFilter: s.cfg.ModelFilter, + Summary: s.snapshot, + LastError: s.lastError, + EventCount: len(s.events), + SubscriberCount: len(s.subs), + } +} + +func (s *Service) handleHealth(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + _, _ = w.Write([]byte("ok\n")) +} + +func (s *Service) handleStatus(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(s.snapshotStatus()) +} + +func (s *Service) handleEvents(w http.ResponseWriter, _ *http.Request) { + s.mu.RLock() + events := make([]Event, len(s.events)) + copy(events, s.events) + s.mu.RUnlock() + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(events) +} + +func (s *Service) handleStream(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "streaming unsupported", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + + ch := make(chan Event, 16) + id := s.addSubscriber(ch) + defer s.removeSubscriber(id) + + // Send current snapshot immediately. + current := Event{ + Type: "snapshot", + Timestamp: time.Now(), + Snapshot: s.snapshotStatus().Summary, + } + writeSSE(w, current) + flusher.Flush() + + for { + select { + case <-r.Context().Done(): + return + case ev := <-ch: + writeSSE(w, ev) + flusher.Flush() + } + } +} + +func writeSSE(w http.ResponseWriter, ev Event) { + data, err := json.Marshal(ev) + if err != nil { + return + } + _, _ = fmt.Fprintf(w, "event: %s\n", ev.Type) + _, _ = fmt.Fprintf(w, "data: %s\n\n", data) +} + +func (s *Service) addSubscriber(ch chan Event) int { + s.mu.Lock() + defer s.mu.Unlock() + s.nextSubID++ + id := s.nextSubID + s.subs[id] = ch + return id +} + +func (s *Service) removeSubscriber(id int) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.subs, id) +} diff --git a/internal/daemon/service_test.go b/internal/daemon/service_test.go new file mode 100644 index 0000000..e574da5 --- /dev/null +++ b/internal/daemon/service_test.go @@ -0,0 +1,66 @@ +package daemon + +import ( + "math" + "testing" + "time" +) + +func TestDiffSnapshots(t *testing.T) { + prev := Snapshot{ + Sessions: 10, + Prompts: 100, + APICalls: 120, + Tokens: 1_000_000, + EstimatedCostUSD: 10.5, + } + curr := Snapshot{ + Sessions: 12, + Prompts: 112, + APICalls: 136, + Tokens: 1_250_000, + EstimatedCostUSD: 13.1, + } + + delta := diffSnapshots(prev, curr) + if delta.Sessions != 2 { + t.Fatalf("Sessions delta = %d, want 2", delta.Sessions) + } + if delta.Prompts != 12 { + t.Fatalf("Prompts delta = %d, want 12", delta.Prompts) + } + if delta.APICalls != 16 { + t.Fatalf("APICalls delta = %d, want 16", delta.APICalls) + } + if delta.Tokens != 250_000 { + t.Fatalf("Tokens delta = %d, want 250000", delta.Tokens) + } + if math.Abs(delta.EstimatedCostUSD-2.6) > 1e-9 { + t.Fatalf("Cost delta = %.2f, want 2.60", delta.EstimatedCostUSD) + } + if delta.isZero() { + t.Fatal("delta unexpectedly reported as zero") + } +} + +func TestPublishEventRingBuffer(t *testing.T) { + s := New(Config{ + DataDir: ".", + Interval: 10 * time.Second, + EventsBuffer: 2, + }) + + s.publishEvent(Event{ID: 1}) + s.publishEvent(Event{ID: 2}) + s.publishEvent(Event{ID: 3}) + + s.mu.RLock() + defer s.mu.RUnlock() + + if len(s.events) != 2 { + t.Fatalf("events len = %d, want 2", len(s.events)) + } + if s.events[0].ID != 2 || s.events[1].ID != 3 { + t.Fatalf("events ring contains IDs [%d, %d], want [2, 3]", s.events[0].ID, s.events[1].ID) + } +}