feat(daemon): add background usage monitor with HTTP/SSE API
Implement a long-running daemon service that continuously polls Claude Code session logs and exposes usage data via local HTTP endpoints. Architecture: - internal/daemon/service.go: Core Service struct managing poll loop, snapshot computation, event buffering, and HTTP handlers - cmd/daemon.go: Cobra commands for start/status/stop with detach mode HTTP Endpoints (default 127.0.0.1:8787): - GET /healthz - Liveness probe for orchestration - GET /v1/status - Current aggregate snapshot + daemon runtime info - GET /v1/events - Recent event buffer as JSON array - GET /v1/stream - Server-Sent Events for real-time updates Snapshot model captures: - Session/prompt/API call counts - Token totals and estimated cost - Cache hit rate - Rolling daily averages (cost/day, tokens/day, sessions/day) Delta detection emits events only when usage actually changes, keeping the event stream lean for downstream consumers. Detach mode (-d, --detach): - Forks a child process with stdout/stderr redirected to log file - Writes PID file for process management - Parent exits after confirming child is running This daemon serves as the foundation for planned capabilities like incident replay, runaway detection, and session classification. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
398
internal/daemon/service.go
Normal file
398
internal/daemon/service.go
Normal file
@@ -0,0 +1,398 @@
|
||||
// Package daemon provides the long-running background usage monitor service.
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/theirongolddev/cburn/internal/model"
|
||||
"github.com/theirongolddev/cburn/internal/pipeline"
|
||||
"github.com/theirongolddev/cburn/internal/store"
|
||||
)
|
||||
|
||||
// Config controls the daemon runtime behavior.
|
||||
type Config struct {
|
||||
DataDir string
|
||||
Days int
|
||||
ProjectFilter string
|
||||
ModelFilter string
|
||||
IncludeSubagents bool
|
||||
UseCache bool
|
||||
Interval time.Duration
|
||||
Addr string
|
||||
EventsBuffer int
|
||||
}
|
||||
|
||||
// Snapshot is a compact usage state for status/event payloads.
|
||||
type Snapshot struct {
|
||||
At time.Time `json:"at"`
|
||||
Sessions int `json:"sessions"`
|
||||
Prompts int `json:"prompts"`
|
||||
APICalls int `json:"api_calls"`
|
||||
Tokens int64 `json:"tokens"`
|
||||
EstimatedCostUSD float64 `json:"estimated_cost_usd"`
|
||||
CacheHitRate float64 `json:"cache_hit_rate"`
|
||||
CostPerDayUSD float64 `json:"cost_per_day_usd"`
|
||||
TokensPerDay int64 `json:"tokens_per_day"`
|
||||
SessionsPerDay float64 `json:"sessions_per_day"`
|
||||
}
|
||||
|
||||
// Delta captures snapshot deltas between polls.
|
||||
type Delta struct {
|
||||
Sessions int `json:"sessions"`
|
||||
Prompts int `json:"prompts"`
|
||||
APICalls int `json:"api_calls"`
|
||||
Tokens int64 `json:"tokens"`
|
||||
EstimatedCostUSD float64 `json:"estimated_cost_usd"`
|
||||
}
|
||||
|
||||
func (d Delta) isZero() bool {
|
||||
return d.Sessions == 0 &&
|
||||
d.Prompts == 0 &&
|
||||
d.APICalls == 0 &&
|
||||
d.Tokens == 0 &&
|
||||
d.EstimatedCostUSD == 0
|
||||
}
|
||||
|
||||
// Event is emitted whenever usage snapshot updates.
|
||||
type Event struct {
|
||||
ID int64 `json:"id"`
|
||||
Type string `json:"type"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Snapshot Snapshot `json:"snapshot"`
|
||||
Delta Delta `json:"delta"`
|
||||
}
|
||||
|
||||
// Status is served at /v1/status.
|
||||
type Status struct {
|
||||
StartedAt time.Time `json:"started_at"`
|
||||
LastPollAt time.Time `json:"last_poll_at"`
|
||||
PollIntervalSec int `json:"poll_interval_sec"`
|
||||
PollCount int64 `json:"poll_count"`
|
||||
DataDir string `json:"data_dir"`
|
||||
Days int `json:"days"`
|
||||
ProjectFilter string `json:"project_filter,omitempty"`
|
||||
ModelFilter string `json:"model_filter,omitempty"`
|
||||
Summary Snapshot `json:"summary"`
|
||||
LastError string `json:"last_error,omitempty"`
|
||||
EventCount int `json:"event_count"`
|
||||
SubscriberCount int `json:"subscriber_count"`
|
||||
}
|
||||
|
||||
// Service provides the daemon runtime and HTTP API.
|
||||
type Service struct {
|
||||
cfg Config
|
||||
|
||||
mu sync.RWMutex
|
||||
startedAt time.Time
|
||||
lastPollAt time.Time
|
||||
pollCount int64
|
||||
lastError string
|
||||
hasSnapshot bool
|
||||
snapshot Snapshot
|
||||
nextEventID int64
|
||||
events []Event
|
||||
|
||||
nextSubID int
|
||||
subs map[int]chan Event
|
||||
}
|
||||
|
||||
// New returns a new daemon service with the provided config.
|
||||
func New(cfg Config) *Service {
|
||||
if cfg.Interval < 2*time.Second {
|
||||
cfg.Interval = 10 * time.Second
|
||||
}
|
||||
if cfg.EventsBuffer < 1 {
|
||||
cfg.EventsBuffer = 200
|
||||
}
|
||||
if cfg.Addr == "" {
|
||||
cfg.Addr = "127.0.0.1:8787"
|
||||
}
|
||||
|
||||
return &Service{
|
||||
cfg: cfg,
|
||||
startedAt: time.Now(),
|
||||
subs: make(map[int]chan Event),
|
||||
}
|
||||
}
|
||||
|
||||
// Run starts HTTP endpoints and polling until ctx is canceled.
|
||||
func (s *Service) Run(ctx context.Context) error {
|
||||
mux := http.NewServeMux()
|
||||
mux.HandleFunc("/healthz", s.handleHealth)
|
||||
mux.HandleFunc("/v1/status", s.handleStatus)
|
||||
mux.HandleFunc("/v1/events", s.handleEvents)
|
||||
mux.HandleFunc("/v1/stream", s.handleStream)
|
||||
|
||||
server := &http.Server{
|
||||
Addr: s.cfg.Addr,
|
||||
Handler: mux,
|
||||
ReadHeaderTimeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||
errCh <- err
|
||||
}
|
||||
}()
|
||||
|
||||
// Seed initial snapshot so status is useful immediately.
|
||||
s.pollOnce()
|
||||
|
||||
ticker := time.NewTicker(s.cfg.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
return server.Shutdown(shutdownCtx)
|
||||
case <-ticker.C:
|
||||
s.pollOnce()
|
||||
case err := <-errCh:
|
||||
return fmt.Errorf("daemon http server: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) pollOnce() {
|
||||
start := time.Now()
|
||||
sessions, err := s.loadSessions()
|
||||
if err != nil {
|
||||
s.mu.Lock()
|
||||
s.lastError = err.Error()
|
||||
s.lastPollAt = time.Now()
|
||||
s.pollCount++
|
||||
s.mu.Unlock()
|
||||
log.Printf("cburn daemon poll error: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
since := now.AddDate(0, 0, -s.cfg.Days)
|
||||
|
||||
filtered := sessions
|
||||
if s.cfg.ProjectFilter != "" {
|
||||
filtered = pipeline.FilterByProject(filtered, s.cfg.ProjectFilter)
|
||||
}
|
||||
if s.cfg.ModelFilter != "" {
|
||||
filtered = pipeline.FilterByModel(filtered, s.cfg.ModelFilter)
|
||||
}
|
||||
|
||||
stats := pipeline.Aggregate(filtered, since, now)
|
||||
snap := snapshotFromSummary(stats, now)
|
||||
|
||||
var (
|
||||
ev Event
|
||||
publish bool
|
||||
)
|
||||
|
||||
s.mu.Lock()
|
||||
prev := s.snapshot
|
||||
prevExists := s.hasSnapshot
|
||||
|
||||
s.hasSnapshot = true
|
||||
s.snapshot = snap
|
||||
s.lastPollAt = now
|
||||
s.pollCount++
|
||||
s.lastError = ""
|
||||
|
||||
if !prevExists {
|
||||
s.nextEventID++
|
||||
ev = Event{
|
||||
ID: s.nextEventID,
|
||||
Type: "snapshot",
|
||||
Timestamp: now,
|
||||
Snapshot: snap,
|
||||
Delta: Delta{},
|
||||
}
|
||||
publish = true
|
||||
} else {
|
||||
delta := diffSnapshots(prev, snap)
|
||||
if !delta.isZero() {
|
||||
s.nextEventID++
|
||||
ev = Event{
|
||||
ID: s.nextEventID,
|
||||
Type: "usage_delta",
|
||||
Timestamp: now,
|
||||
Snapshot: snap,
|
||||
Delta: delta,
|
||||
}
|
||||
publish = true
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
if publish {
|
||||
s.publishEvent(ev)
|
||||
}
|
||||
|
||||
_ = start
|
||||
}
|
||||
|
||||
func (s *Service) loadSessions() ([]model.SessionStats, error) {
|
||||
if s.cfg.UseCache {
|
||||
cache, err := store.Open(pipeline.CachePath())
|
||||
if err == nil {
|
||||
defer func() { _ = cache.Close() }()
|
||||
cr, loadErr := pipeline.LoadWithCache(s.cfg.DataDir, s.cfg.IncludeSubagents, cache, nil)
|
||||
if loadErr == nil {
|
||||
return cr.Sessions, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
result, err := pipeline.Load(s.cfg.DataDir, s.cfg.IncludeSubagents, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result.Sessions, nil
|
||||
}
|
||||
|
||||
func snapshotFromSummary(stats model.SummaryStats, at time.Time) Snapshot {
|
||||
return Snapshot{
|
||||
At: at,
|
||||
Sessions: stats.TotalSessions,
|
||||
Prompts: stats.TotalPrompts,
|
||||
APICalls: stats.TotalAPICalls,
|
||||
Tokens: stats.TotalBilledTokens,
|
||||
EstimatedCostUSD: stats.EstimatedCost,
|
||||
CacheHitRate: stats.CacheHitRate,
|
||||
CostPerDayUSD: stats.CostPerDay,
|
||||
TokensPerDay: stats.TokensPerDay,
|
||||
SessionsPerDay: stats.SessionsPerDay,
|
||||
}
|
||||
}
|
||||
|
||||
func diffSnapshots(prev, curr Snapshot) Delta {
|
||||
return Delta{
|
||||
Sessions: curr.Sessions - prev.Sessions,
|
||||
Prompts: curr.Prompts - prev.Prompts,
|
||||
APICalls: curr.APICalls - prev.APICalls,
|
||||
Tokens: curr.Tokens - prev.Tokens,
|
||||
EstimatedCostUSD: curr.EstimatedCostUSD - prev.EstimatedCostUSD,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) publishEvent(ev Event) {
|
||||
s.mu.Lock()
|
||||
s.events = append(s.events, ev)
|
||||
if len(s.events) > s.cfg.EventsBuffer {
|
||||
s.events = s.events[len(s.events)-s.cfg.EventsBuffer:]
|
||||
}
|
||||
|
||||
for _, ch := range s.subs {
|
||||
select {
|
||||
case ch <- ev:
|
||||
default:
|
||||
}
|
||||
}
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *Service) snapshotStatus() Status {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return Status{
|
||||
StartedAt: s.startedAt,
|
||||
LastPollAt: s.lastPollAt,
|
||||
PollIntervalSec: int(s.cfg.Interval.Seconds()),
|
||||
PollCount: s.pollCount,
|
||||
DataDir: s.cfg.DataDir,
|
||||
Days: s.cfg.Days,
|
||||
ProjectFilter: s.cfg.ProjectFilter,
|
||||
ModelFilter: s.cfg.ModelFilter,
|
||||
Summary: s.snapshot,
|
||||
LastError: s.lastError,
|
||||
EventCount: len(s.events),
|
||||
SubscriberCount: len(s.subs),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) handleHealth(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
_, _ = w.Write([]byte("ok\n"))
|
||||
}
|
||||
|
||||
func (s *Service) handleStatus(w http.ResponseWriter, _ *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(s.snapshotStatus())
|
||||
}
|
||||
|
||||
func (s *Service) handleEvents(w http.ResponseWriter, _ *http.Request) {
|
||||
s.mu.RLock()
|
||||
events := make([]Event, len(s.events))
|
||||
copy(events, s.events)
|
||||
s.mu.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
_ = json.NewEncoder(w).Encode(events)
|
||||
}
|
||||
|
||||
func (s *Service) handleStream(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
|
||||
ch := make(chan Event, 16)
|
||||
id := s.addSubscriber(ch)
|
||||
defer s.removeSubscriber(id)
|
||||
|
||||
// Send current snapshot immediately.
|
||||
current := Event{
|
||||
Type: "snapshot",
|
||||
Timestamp: time.Now(),
|
||||
Snapshot: s.snapshotStatus().Summary,
|
||||
}
|
||||
writeSSE(w, current)
|
||||
flusher.Flush()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
case ev := <-ch:
|
||||
writeSSE(w, ev)
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func writeSSE(w http.ResponseWriter, ev Event) {
|
||||
data, err := json.Marshal(ev)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
_, _ = fmt.Fprintf(w, "event: %s\n", ev.Type)
|
||||
_, _ = fmt.Fprintf(w, "data: %s\n\n", data)
|
||||
}
|
||||
|
||||
func (s *Service) addSubscriber(ch chan Event) int {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
s.nextSubID++
|
||||
id := s.nextSubID
|
||||
s.subs[id] = ch
|
||||
return id
|
||||
}
|
||||
|
||||
func (s *Service) removeSubscriber(id int) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
delete(s.subs, id)
|
||||
}
|
||||
Reference in New Issue
Block a user