package api import ( "context" "encoding/json" "log/slog" "net/http" "sync" "time" "github.com/coder/websocket" "github.com/google/uuid" ) // WSEvent is the envelope pushed over WebSocket to dashboard clients. type WSEvent struct { Type string `json:"type"` EventID uuid.UUID `json:"event_id"` Payload json.RawMessage `json:"payload"` Timestamp time.Time `json:"timestamp"` } type subscriber struct { conn *websocket.Conn send chan []byte closed chan struct{} } // Hub fans out per-event WebSocket events to subscribers. Connections are // keyed by event_id; a single dashboard page subscribes to one event at a // time. Backpressure: if a slow client falls behind, we drop the message // for that subscriber rather than block the broadcaster. type Hub struct { logger *slog.Logger mu sync.RWMutex subs map[uuid.UUID]map[*subscriber]struct{} } func NewHub(logger *slog.Logger) *Hub { return &Hub{ logger: logger, subs: make(map[uuid.UUID]map[*subscriber]struct{}), } } // Broadcast publishes evt to all subscribers of evt.EventID. func (h *Hub) Broadcast(evt WSEvent) { if evt.Timestamp.IsZero() { evt.Timestamp = time.Now().UTC() } body, err := json.Marshal(evt) if err != nil { h.logger.Error("ws marshal", "err", err) return } h.mu.RLock() defer h.mu.RUnlock() for s := range h.subs[evt.EventID] { select { case s.send <- body: default: // drop on slow client; the connection will be closed when its // reader goroutine notices the closed channel. h.logger.Warn("ws subscriber slow, dropping message", "event_id", evt.EventID) } } } func (h *Hub) add(eventID uuid.UUID, s *subscriber) { h.mu.Lock() defer h.mu.Unlock() if h.subs[eventID] == nil { h.subs[eventID] = make(map[*subscriber]struct{}) } h.subs[eventID][s] = struct{}{} } func (h *Hub) remove(eventID uuid.UUID, s *subscriber) { h.mu.Lock() defer h.mu.Unlock() if subs, ok := h.subs[eventID]; ok { delete(subs, s) if len(subs) == 0 { delete(h.subs, eventID) } } } type wsHandler struct { logger *slog.Logger hub *Hub } // GET /ws/events/{id} — dashboard live feed for one event. func (h *wsHandler) handle(w http.ResponseWriter, r *http.Request) { eventID, ok := parseIDParam(w, r, "id") if !ok { return } conn, err := websocket.Accept(w, r, &websocket.AcceptOptions{ // In dev the frontend runs on a different origin (localhost:3000 → localhost:8080). // We're not relying on cookies, so it's safe to skip the same-origin check. InsecureSkipVerify: true, }) if err != nil { h.logger.Warn("ws accept", "err", err) return } sub := &subscriber{ conn: conn, send: make(chan []byte, 32), closed: make(chan struct{}), } h.hub.add(eventID, sub) defer h.hub.remove(eventID, sub) ctx := conn.CloseRead(r.Context()) pingTicker := time.NewTicker(20 * time.Second) defer pingTicker.Stop() for { select { case <-ctx.Done(): conn.Close(websocket.StatusNormalClosure, "") return case msg := <-sub.send: writeCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err := conn.Write(writeCtx, websocket.MessageText, msg) cancel() if err != nil { conn.Close(websocket.StatusInternalError, "write failed") return } case <-pingTicker.C: pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) err := conn.Ping(pingCtx) cancel() if err != nil { return } } } }