package natspub import ( "context" "encoding/json" "errors" "fmt" "log/slog" "time" "github.com/google/uuid" "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" ) type Client struct { conn *nats.Conn js jetstream.JetStream logger *slog.Logger } func Connect(ctx context.Context, url string, logger *slog.Logger) (*Client, error) { conn, err := nats.Connect(url, nats.Name("guestguard-api"), nats.MaxReconnects(-1), nats.ReconnectWait(2*time.Second), nats.DisconnectErrHandler(func(_ *nats.Conn, err error) { if err != nil { logger.Warn("nats disconnected", "err", err) } }), nats.ReconnectHandler(func(c *nats.Conn) { logger.Info("nats reconnected", "url", c.ConnectedUrl()) }), ) if err != nil { return nil, fmt.Errorf("connect nats: %w", err) } js, err := jetstream.New(conn) if err != nil { conn.Close() return nil, fmt.Errorf("jetstream: %w", err) } c := &Client{conn: conn, js: js, logger: logger} if err := c.ensureStream(ctx); err != nil { conn.Close() return nil, err } return c, nil } func (c *Client) ensureStream(ctx context.Context) error { cfg := jetstream.StreamConfig{ Name: StreamName, Subjects: StreamSubjects(), Retention: jetstream.LimitsPolicy, Storage: jetstream.FileStorage, MaxAge: 14 * 24 * time.Hour, Replicas: 1, } _, err := c.js.CreateOrUpdateStream(ctx, cfg) if err != nil { return fmt.Errorf("create stream %s: %w", StreamName, err) } return nil } func (c *Client) Close() { if c.conn != nil { c.conn.Drain() //nolint:errcheck } } func (c *Client) JetStream() jetstream.JetStream { return c.js } func (c *Client) PublishAccessAttempted(ctx context.Context, evt AccessAttempted) error { if evt.OccurredAt.IsZero() { evt.OccurredAt = time.Now().UTC() } return c.publishJSON(ctx, SubjectAccessAttempted, evt, evt.GuestID) } func (c *Client) PublishRSVPConfirmed(ctx context.Context, evt RSVPConfirmed) error { if evt.SubmittedAt.IsZero() { evt.SubmittedAt = time.Now().UTC() } return c.publishJSON(ctx, SubjectRSVPConfirmed, evt, evt.RSVPID) } func (c *Client) PublishInvitationSend(ctx context.Context, evt InvitationSend) error { if evt.IssuedAt.IsZero() { evt.IssuedAt = time.Now().UTC() } // Dedup by token id — re-issuing a token (currently disallowed by the // unique constraint, but defensive) won't double-send the email. return c.publishJSON(ctx, SubjectInvitationSend, evt, evt.TokenID) } func (c *Client) publishJSON(ctx context.Context, subject string, payload any, dedupeKey uuid.UUID) error { body, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal %s: %w", subject, err) } msg := &nats.Msg{Subject: subject, Data: body} msg.Header = nats.Header{} msg.Header.Set("Content-Type", "application/json") if dedupeKey != uuid.Nil { msg.Header.Set("Nats-Msg-Id", subject+":"+dedupeKey.String()+":"+time.Now().UTC().Format(time.RFC3339Nano)) } pubCtx, cancel := context.WithTimeout(ctx, 3*time.Second) defer cancel() _, err = c.js.PublishMsg(pubCtx, msg) if err != nil { if errors.Is(err, context.DeadlineExceeded) { return fmt.Errorf("publish %s timed out: %w", subject, err) } return fmt.Errorf("publish %s: %w", subject, err) } return nil }