package natspub import ( "context" "encoding/json" "fmt" "log/slog" "time" "github.com/nats-io/nats.go/jetstream" ) type FraudScoredHandler func(ctx context.Context, evt FraudScored) error type FraudScoredSubscriber struct { logger *slog.Logger consumer jetstream.Consumer handler FraudScoredHandler } func NewFraudScoredSubscriber( ctx context.Context, c *Client, durable string, handler FraudScoredHandler, logger *slog.Logger, ) (*FraudScoredSubscriber, error) { cons, err := c.js.CreateOrUpdateConsumer(ctx, StreamName, jetstream.ConsumerConfig{ Durable: durable, Name: durable, FilterSubject: SubjectFraudScored, AckPolicy: jetstream.AckExplicitPolicy, DeliverPolicy: jetstream.DeliverAllPolicy, MaxDeliver: 5, AckWait: 30 * time.Second, }) if err != nil { return nil, fmt.Errorf("create consumer %s: %w", durable, err) } return &FraudScoredSubscriber{ logger: logger, consumer: cons, handler: handler, }, nil } func (s *FraudScoredSubscriber) Start(ctx context.Context) (jetstream.ConsumeContext, error) { cc, err := s.consumer.Consume(func(msg jetstream.Msg) { var evt FraudScored if err := json.Unmarshal(msg.Data(), &evt); err != nil { s.logger.Error("decode fraud.scored", "err", err) _ = msg.Term() return } hctx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() if err := s.handler(hctx, evt); err != nil { s.logger.Error("handle fraud.scored", "err", err, "guest_id", evt.GuestID, "score", evt.Score) _ = msg.NakWithDelay(2 * time.Second) return } _ = msg.Ack() }) if err != nil { return nil, fmt.Errorf("consume: %w", err) } return cc, nil }