feat(tier2): reminders + broadcasts pipeline — Block F

The Communications surface. Hosts can schedule custom broadcasts to a
chosen audience (everyone / attending / pending / declined / maybe),
edit or cancel anything that hasn't fired, and review delivery
outcomes. Four auto-reminders are pre-seeded on every new event:
7-day, 3-day last call, 1-day, and day-of.

Schema (migration 0012)
- scheduled_messages — one row per message envelope, with status
  walking draft -> scheduled -> sending -> sent (or cancelled/failed).
  Partial index on (send_at) WHERE status='scheduled' for the
  scheduler poll; per-event index for the Communications tab list.
- message_deliveries — per-recipient outcomes so a partial-failure
  batch doesn't lose the rows that did succeed.

Domain
- MessageAudience / MessageChannel / MessageStatus enums
- SeedAutoReminders helper that returns four canonical reminder rows
  for a given event_date, skipping any whose send_at would land in
  the past (events created close to the date)

Storage
- MessageRepo: Create / CreateBatch / Get / ListByEvent / Update
  (locks the row and refuses unless status is draft|scheduled) /
  Cancel / PromoteToScheduled (the send-now path) / ListDue /
  ClaimForSending (atomic guard against two replicas double-sending) /
  MarkSent / MarkFailed / RecordDelivery / DeliveryStats /
  LoadRecipients (audience-filtered guest list) / CountRecipients
- EventRepo.Create now seeds auto-reminders in the same transaction
  that inserts the event and its owner collaborator row

API (all editor+, except recipient-count which is viewer+)
- GET    /events/{id}/messages
- GET    /events/{id}/messages/recipient-count?audience=...
- POST   /events/{id}/messages   (draft / schedule / send-now)
- PATCH  /events/{id}/messages/{message_id}
- POST   /events/{id}/messages/{message_id}/send-now
- DELETE /events/{id}/messages/{message_id}

Scheduler worker (cmd/notifier)
- New file scheduler.go: polls ListDue every 30s, claims each row
  atomically (ClaimForSending uses a status=scheduled guard so two
  notifier replicas don't double-send), renders subject and body
  per recipient with the {{guest_name}} / {{event_name}} /
  {{event_date}} / {{venue}} / {{rsvp_link}} placeholders, sends via
  the existing GuestEmailDispatcher (Resend > SMTP > SES > log
  stub, same picker as the API), records each delivery row.

Frontend
- New CommunicationsCard.vue with compose form (audience + channel +
  subject + body + send-mode radios), live "X guests will receive
  this" recipient-count preview, and three sub-tabs for Scheduled /
  Sent / Cancelled. Per-message Send-now and Cancel actions for
  draft/scheduled rows. Friendly labels for auto-seeded reminders
  ("1-day reminder", "Day-of reminder") so the slugs never leak.
- New top-level tab "Communications" on the event-detail page,
  between Collaborators and Branding.

Tests
- TestAutoReminderSeeding confirms a future-dated event lands the
  four canonical reminders in scheduled state.
- TestComposeAndEditMessage walks draft -> patch -> send-now ->
  cancel and asserts the conflict on PATCH-after-cancel.
- TestRecipientCountAudienceFilter seeds a known guest mix and
  checks every audience preset returns the right count.
- Full integration suite passes (~177s).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Kwaku Danso
2026-05-20 16:56:37 +01:00
parent dbddf17e3b
commit dc840bfc14
12 changed files with 1859 additions and 7 deletions
+316
View File
@@ -0,0 +1,316 @@
package api
import (
"encoding/json"
"errors"
"log/slog"
"net/http"
"time"
"github.com/alchemistkay/guestguard/internal/domain"
"github.com/alchemistkay/guestguard/internal/storage"
)
// messageHandler is the host-facing surface for Tier 2 Block F:
// scheduled reminders + custom broadcasts. Editor+ for writes; viewer+
// for reads via the existing requireRole gate.
type messageHandler struct {
logger *slog.Logger
events *storage.EventRepo
collabs *storage.CollaboratorRepo
repo *storage.MessageRepo
}
// --- response shapes ---
// messageView wraps the persisted row with a delivery summary. We don't
// inline the per-recipient rows here — the host expands a single message
// to drill into those — but the "X of Y delivered" rollup is useful in
// the list view.
type messageView struct {
*domain.ScheduledMessage
DeliveryStats storage.DeliveryStats `json:"delivery_stats"`
}
type listMessagesResponse struct {
Messages []messageView `json:"messages"`
}
// GET /events/{id}/messages — viewer+.
func (h *messageHandler) list(w http.ResponseWriter, r *http.Request) {
hostID, ok := hostFromContext(w, r)
if !ok {
return
}
eventID, ok := parseIDParam(w, r, "id")
if !ok {
return
}
if _, _, ok := requireRole(w, r, h.events, h.collabs, eventID, hostID, domain.RoleViewer); !ok {
return
}
msgs, err := h.repo.ListByEvent(r.Context(), eventID)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to list messages")
return
}
views := make([]messageView, 0, len(msgs))
for i := range msgs {
stats, _ := h.repo.DeliveryStats(r.Context(), msgs[i].ID)
views = append(views, messageView{
ScheduledMessage: &msgs[i],
DeliveryStats: stats,
})
}
writeJSON(w, http.StatusOK, listMessagesResponse{Messages: views})
}
// composeMessageRequest is what the Communications tab POSTs / PATCHes.
// SendAt is optional on create — omit it together with status="draft" to
// save a draft, or set it for scheduling. Channel defaults to email.
type composeMessageRequest struct {
SendAt *time.Time `json:"send_at"`
Audience domain.MessageAudience `json:"audience"`
Channel domain.MessageChannel `json:"channel"`
Subject string `json:"subject"`
Body string `json:"body"`
Draft bool `json:"draft"`
}
// recipientCountResponse is what the live "X guests will receive this"
// preview chip on the compose form fetches when the audience picker
// changes.
type recipientCountResponse struct {
Count int `json:"count"`
}
// GET /events/{id}/messages/recipient-count?audience=... — viewer+. Lets
// the compose form show a live count without needing to load the full
// guest list client-side.
func (h *messageHandler) recipientCount(w http.ResponseWriter, r *http.Request) {
hostID, ok := hostFromContext(w, r)
if !ok {
return
}
eventID, ok := parseIDParam(w, r, "id")
if !ok {
return
}
if _, _, ok := requireRole(w, r, h.events, h.collabs, eventID, hostID, domain.RoleViewer); !ok {
return
}
audience := domain.MessageAudience(r.URL.Query().Get("audience"))
if !audience.Valid() {
writeError(w, http.StatusBadRequest, "audience must be one of: all|attending|pending|declined|maybe")
return
}
n, err := h.repo.CountRecipients(r.Context(), eventID, audience)
if err != nil {
writeError(w, http.StatusInternalServerError, "failed to count recipients")
return
}
writeJSON(w, http.StatusOK, recipientCountResponse{Count: n})
}
// POST /events/{id}/messages — editor+.
//
// The host can save a draft (Draft=true, SendAt nil → status=draft) or
// schedule for later (SendAt set → status=scheduled). Send-immediately is
// just "schedule for now"; we expose a separate send-now route for the
// "send this draft right now" affordance below.
func (h *messageHandler) create(w http.ResponseWriter, r *http.Request) {
hostID, ok := hostFromContext(w, r)
if !ok {
return
}
eventID, ok := parseIDParam(w, r, "id")
if !ok {
return
}
if _, _, ok := requireRole(w, r, h.events, h.collabs, eventID, hostID, domain.RoleEditor); !ok {
return
}
var req composeMessageRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json")
return
}
if !req.Audience.Valid() {
writeError(w, http.StatusBadRequest, "audience required (all|attending|pending|declined|maybe)")
return
}
if req.Channel == "" {
req.Channel = domain.ChannelEmail
}
if !req.Channel.Valid() {
writeError(w, http.StatusBadRequest, "channel must be one of: email|sms|both")
return
}
if req.Body == "" {
writeError(w, http.StatusBadRequest, "body is required")
return
}
// Pick the status from send_at + draft flag.
status := domain.StatusScheduled
sendAt := time.Now().UTC()
if req.Draft || req.SendAt == nil {
if req.Draft {
status = domain.StatusDraft
} else {
// No send_at given and not flagged as draft: send now.
status = domain.StatusScheduled
}
} else {
sendAt = req.SendAt.UTC()
}
subj := req.Subject
var subjPtr *string
if subj != "" {
subjPtr = &subj
}
m, err := h.repo.Create(r.Context(), storage.CreateMessageParams{
EventID: eventID,
SendAt: sendAt,
Audience: req.Audience,
Channel: req.Channel,
Subject: subjPtr,
Body: req.Body,
Status: status,
CreatedBy: &hostID,
})
if err != nil {
h.logger.Error("create message", "err", err)
writeError(w, http.StatusInternalServerError, "failed to create message")
return
}
writeJSON(w, http.StatusCreated, m)
}
// PATCH /events/{id}/messages/{message_id} — editor+. Only legal while
// the message is still draft or scheduled (the storage layer enforces
// this with a row lock).
func (h *messageHandler) update(w http.ResponseWriter, r *http.Request) {
hostID, ok := hostFromContext(w, r)
if !ok {
return
}
eventID, ok := parseIDParam(w, r, "id")
if !ok {
return
}
if _, _, ok := requireRole(w, r, h.events, h.collabs, eventID, hostID, domain.RoleEditor); !ok {
return
}
msgID, ok := parseIDParam(w, r, "message_id")
if !ok {
return
}
var req composeMessageRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
writeError(w, http.StatusBadRequest, "invalid json")
return
}
params := storage.UpdateMessageParams{}
if req.SendAt != nil {
t := req.SendAt.UTC()
params.SendAt = &t
}
if req.Audience != "" {
if !req.Audience.Valid() {
writeError(w, http.StatusBadRequest, "invalid audience")
return
}
params.Audience = &req.Audience
}
if req.Channel != "" {
if !req.Channel.Valid() {
writeError(w, http.StatusBadRequest, "invalid channel")
return
}
params.Channel = &req.Channel
}
if req.Subject != "" {
params.Subject = &req.Subject
}
if req.Body != "" {
params.Body = &req.Body
}
m, err := h.repo.Update(r.Context(), eventID, msgID, params)
if err != nil {
switch {
case errors.Is(err, domain.ErrMessageNotFound):
writeError(w, http.StatusNotFound, "message not found")
case errors.Is(err, domain.ErrMessageNotEditable):
writeError(w, http.StatusConflict, "message can only be edited while scheduled or draft")
default:
h.logger.Error("update message", "err", err)
writeError(w, http.StatusInternalServerError, "failed to update message")
}
return
}
writeJSON(w, http.StatusOK, m)
}
// POST /events/{id}/messages/{message_id}/send-now — editor+.
// Promotes a draft or future-scheduled message to send_at=now, so the
// next scheduler poll picks it up.
func (h *messageHandler) sendNow(w http.ResponseWriter, r *http.Request) {
hostID, ok := hostFromContext(w, r)
if !ok {
return
}
eventID, ok := parseIDParam(w, r, "id")
if !ok {
return
}
if _, _, ok := requireRole(w, r, h.events, h.collabs, eventID, hostID, domain.RoleEditor); !ok {
return
}
msgID, ok := parseIDParam(w, r, "message_id")
if !ok {
return
}
if err := h.repo.PromoteToScheduled(r.Context(), eventID, msgID); err != nil {
if errors.Is(err, domain.ErrMessageNotEditable) {
writeError(w, http.StatusConflict, "message has already been sent or cancelled")
return
}
writeError(w, http.StatusInternalServerError, "failed to schedule send")
return
}
w.WriteHeader(http.StatusAccepted)
}
// DELETE /events/{id}/messages/{message_id} — editor+. Cancels a
// scheduled / draft message. Sent or sending messages can't be deleted.
func (h *messageHandler) cancel(w http.ResponseWriter, r *http.Request) {
hostID, ok := hostFromContext(w, r)
if !ok {
return
}
eventID, ok := parseIDParam(w, r, "id")
if !ok {
return
}
if _, _, ok := requireRole(w, r, h.events, h.collabs, eventID, hostID, domain.RoleEditor); !ok {
return
}
msgID, ok := parseIDParam(w, r, "message_id")
if !ok {
return
}
if err := h.repo.Cancel(r.Context(), eventID, msgID); err != nil {
if errors.Is(err, domain.ErrMessageNotEditable) {
writeError(w, http.StatusConflict, "message has already been sent")
return
}
writeError(w, http.StatusInternalServerError, "failed to cancel message")
return
}
w.WriteHeader(http.StatusNoContent)
}
+23
View File
@@ -42,6 +42,7 @@ type Server struct {
branding *brandingHandler
uploads *uploadHandler
security *securityHandler
messages *messageHandler
}
type ServerDeps struct {
@@ -103,6 +104,7 @@ func NewServer(deps ServerDeps) (*Server, error) {
brandingRepo := storage.NewBrandingRepo(deps.DB)
allowlistRepo := storage.NewAllowlistRepo(deps.DB)
editNonces := newEditNonceStore(deps.Redis)
messageRepo := storage.NewMessageRepo(deps.DB)
feedbackRepo := storage.NewFeedbackRepo(deps.DB)
// Branding image store. Empty UploadsDir leaves it nil and the upload
@@ -275,6 +277,12 @@ func NewServer(deps ServerDeps) (*Server, error) {
feedback: feedbackRepo,
access: accessRepo,
},
messages: &messageHandler{
logger: deps.Logger,
events: eventRepo,
collabs: collabRepo,
repo: messageRepo,
},
collabs: &collaboratorHandler{
logger: deps.Logger,
events: eventRepo,
@@ -391,6 +399,21 @@ func (s *Server) Handler() http.Handler {
mux.Handle("POST /events/{id}/access-logs/{log_id}/feedback",
authed(http.HandlerFunc(s.security.recordFeedback)))
// Block F — scheduled messages (reminders + broadcasts).
// All editor+ except the recipient-count preview which is viewer+.
mux.Handle("GET /events/{id}/messages",
authed(http.HandlerFunc(s.messages.list)))
mux.Handle("GET /events/{id}/messages/recipient-count",
authed(http.HandlerFunc(s.messages.recipientCount)))
mux.Handle("POST /events/{id}/messages",
authed(rl("messages_create", 100, 24*time.Hour, userIDKey, http.HandlerFunc(s.messages.create))))
mux.Handle("PATCH /events/{id}/messages/{message_id}",
authed(http.HandlerFunc(s.messages.update)))
mux.Handle("POST /events/{id}/messages/{message_id}/send-now",
authed(http.HandlerFunc(s.messages.sendNow)))
mux.Handle("DELETE /events/{id}/messages/{message_id}",
authed(http.HandlerFunc(s.messages.cancel)))
// Block D — event branding. Reads are viewer+; PUT is editor+. The
// upload endpoint is gated by auth only (any signed-in user can mint
// an image URL; the URL is no use without an event they can edit
+182
View File
@@ -0,0 +1,182 @@
package domain
import (
"errors"
"time"
"github.com/google/uuid"
)
// Tier 2 Block F — reminders + broadcasts.
// MessageAudience picks which slice of an event's guests a message
// targets. "All" means every guest; the response-filtered values mirror
// the RSVP states.
type MessageAudience string
const (
AudienceAll MessageAudience = "all"
AudienceAttending MessageAudience = "attending"
AudiencePending MessageAudience = "pending"
AudienceDeclined MessageAudience = "declined"
AudienceMaybe MessageAudience = "maybe"
)
func (a MessageAudience) Valid() bool {
switch a {
case AudienceAll, AudienceAttending, AudiencePending, AudienceDeclined, AudienceMaybe:
return true
}
return false
}
// MessageChannel is the delivery surface. SMS is gated by tier elsewhere
// (Tier 1 Block F); email is universal.
type MessageChannel string
const (
ChannelEmail MessageChannel = "email"
ChannelSMS MessageChannel = "sms"
ChannelBoth MessageChannel = "both"
)
func (c MessageChannel) Valid() bool {
switch c {
case ChannelEmail, ChannelSMS, ChannelBoth:
return true
}
return false
}
// MessageStatus walks the envelope through its life: draft (composed
// but not scheduled), scheduled (queued for the worker), sending (worker
// picked it up), sent (all deliveries attempted), cancelled (host
// pulled it), failed (worker couldn't proceed at all).
type MessageStatus string
const (
StatusDraft MessageStatus = "draft"
StatusScheduled MessageStatus = "scheduled"
StatusSending MessageStatus = "sending"
StatusSent MessageStatus = "sent"
StatusCancelled MessageStatus = "cancelled"
StatusFailed MessageStatus = "failed"
)
// ScheduledMessage is one host-composed (or auto-seeded) communication
// to a slice of an event's guests.
type ScheduledMessage struct {
ID uuid.UUID `json:"id"`
EventID uuid.UUID `json:"event_id"`
SendAt time.Time `json:"send_at"`
Audience MessageAudience `json:"audience"`
Channel MessageChannel `json:"channel"`
TemplateKey *string `json:"template_key,omitempty"`
Subject *string `json:"subject,omitempty"`
Body string `json:"body"`
Status MessageStatus `json:"status"`
SentAt *time.Time `json:"sent_at,omitempty"`
RecipientCount *int `json:"recipient_count,omitempty"`
CreatedBy *uuid.UUID `json:"created_by,omitempty"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// MessageDelivery is one recipient's outcome within a message batch.
// Status here is a free-form string rather than an enum so the notifier
// can record provider-specific reasons (bounce / suppressed / opted-out
// etc.) without a new migration each time.
type MessageDelivery struct {
MessageID uuid.UUID `json:"message_id"`
GuestID uuid.UUID `json:"guest_id"`
Status string `json:"status"`
SentAt *time.Time `json:"sent_at,omitempty"`
Error *string `json:"error,omitempty"`
}
// Auto-reminder template keys. The scheduler treats these as hints when
// it picks subject lines / formats; the body can be edited by the host
// just like a custom broadcast.
const (
TemplateReminder7d = "reminder_7d"
TemplateReminder1d = "reminder_1d"
TemplateReminderDayOf = "reminder_dayof"
TemplateLastCall = "last_call"
)
// SeedAutoReminders returns the canonical set of auto-reminder rows for
// an event with the given date. Each row is returned without an ID;
// the caller (EventRepo.Create) inserts them. Rows whose send_at is in
// the past are dropped — a host who creates an event 2 days from the
// big day shouldn't get a "7 days to go" reminder scheduled into
// yesterday.
func SeedAutoReminders(eventID uuid.UUID, eventDate time.Time) []ScheduledMessage {
now := time.Now().UTC()
candidates := []struct {
key string
offset time.Duration
audience MessageAudience
subject string
body string
}{
{
key: TemplateReminder7d,
offset: -7 * 24 * time.Hour,
audience: AudiencePending,
subject: "One week to go — please let us know!",
body: "Hi {{guest_name}},\n\nThe big day for {{event_name}} is just a week away. We haven't heard from you yet — could you let us know whether you'll make it? Just tap the link below.\n\n{{rsvp_link}}\n\nThanks!",
},
{
key: TemplateLastCall,
offset: -3 * 24 * time.Hour,
audience: AudiencePending,
subject: "Last call to RSVP for {{event_name}}",
body: "Hi {{guest_name}},\n\n{{event_name}} is just three days away and we're finalising numbers with the venue. Please RSVP today:\n\n{{rsvp_link}}",
},
{
key: TemplateReminder1d,
offset: -1 * 24 * time.Hour,
audience: AudienceAttending,
subject: "Tomorrow! Quick details for {{event_name}}",
body: "Hi {{guest_name}},\n\nLooking forward to seeing you tomorrow at {{event_name}}.\n\nWhere: {{venue}}\nWhen: {{event_date}}\n\nSafe travels!",
},
{
key: TemplateReminderDayOf,
offset: -3 * time.Hour,
audience: AudienceAttending,
subject: "See you in a few hours at {{event_name}}",
body: "Hi {{guest_name}},\n\nWe're getting ready for you. {{event_name}} kicks off shortly at {{venue}}.\n\nSee you soon!",
},
}
out := make([]ScheduledMessage, 0, len(candidates))
for _, c := range candidates {
sendAt := eventDate.Add(c.offset)
// Skip reminders that would fire in the past — typical when an
// event is created close to the date. The host can still
// compose a custom broadcast.
if sendAt.Before(now) {
continue
}
key := c.key
subj := c.subject
out = append(out, ScheduledMessage{
EventID: eventID,
SendAt: sendAt,
Audience: c.audience,
Channel: ChannelEmail,
TemplateKey: &key,
Subject: &subj,
Body: c.body,
Status: StatusScheduled,
})
}
return out
}
var (
ErrMessageNotFound = errors.New("message not found")
ErrMessageNotEditable = errors.New("message can only be edited while scheduled")
ErrInvalidAudience = errors.New("invalid audience")
ErrInvalidChannel = errors.New("invalid channel")
)
+26
View File
@@ -78,12 +78,38 @@ func (r *EventRepo) Create(ctx context.Context, p CreateEventParams) (*domain.Ev
return nil, fmt.Errorf("seed owner collaborator: %w", err)
}
// Block F: auto-seed reminder messages so the host gets the
// "we'll nudge people for you" experience without lifting a finger.
// Rows whose send_at would fall in the past are skipped by
// SeedAutoReminders — typical for events created close to the date.
// Hosts can edit / cancel any of these from the Communications tab.
for _, m := range domain.SeedAutoReminders(ev.ID, ev.EventDate) {
if _, err := tx.Exec(ctx, `
INSERT INTO scheduled_messages
(event_id, send_at, audience, channel, template_key, subject, body, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, m.EventID, m.SendAt, m.Audience, m.Channel,
m.TemplateKey, m.Subject, m.Body, m.Status); err != nil {
return nil, fmt.Errorf("seed auto-reminder %s: %w",
ifNilString(m.TemplateKey), err)
}
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}
return ev, nil
}
// ifNilString is a tiny helper so the error message above stays readable
// when an auto-reminder row somehow doesn't carry a template key.
func ifNilString(p *string) string {
if p == nil {
return "<unknown>"
}
return *p
}
func (r *EventRepo) Get(ctx context.Context, id uuid.UUID) (*domain.Event, error) {
const q = `
SELECT id, host_id, name, slug, event_date, venue, max_capacity, settings, status, created_at, updated_at, fraud_medium_threshold, fraud_high_threshold, fraud_block_threshold
+405
View File
@@ -0,0 +1,405 @@
package storage
import (
"context"
"errors"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/alchemistkay/guestguard/internal/domain"
)
// MessageRepo owns the scheduled_messages + message_deliveries pair.
// Tier 2 Block F.
type MessageRepo struct {
pool *pgxpool.Pool
}
func NewMessageRepo(db *DB) *MessageRepo {
return &MessageRepo{pool: db.Pool}
}
type CreateMessageParams struct {
EventID uuid.UUID
SendAt time.Time
Audience domain.MessageAudience
Channel domain.MessageChannel
TemplateKey *string
Subject *string
Body string
Status domain.MessageStatus
CreatedBy *uuid.UUID
}
// Create inserts a row and returns the persisted message.
func (r *MessageRepo) Create(ctx context.Context, p CreateMessageParams) (*domain.ScheduledMessage, error) {
const q = `
INSERT INTO scheduled_messages
(event_id, send_at, audience, channel, template_key, subject, body, status, created_by)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
RETURNING id, event_id, send_at, audience, channel, template_key, subject, body,
status, sent_at, recipient_count, created_by, created_at, updated_at
`
row := r.pool.QueryRow(ctx, q,
p.EventID, p.SendAt, p.Audience, p.Channel, p.TemplateKey, p.Subject,
p.Body, p.Status, p.CreatedBy,
)
return scanMessage(row)
}
// CreateBatch inserts many messages in one transaction. Used by the
// auto-reminder seeding path on event creation.
func (r *MessageRepo) CreateBatch(ctx context.Context, msgs []domain.ScheduledMessage) error {
if len(msgs) == 0 {
return nil
}
tx, err := r.pool.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
const q = `
INSERT INTO scheduled_messages
(event_id, send_at, audience, channel, template_key, subject, body, status)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`
for _, m := range msgs {
if _, err := tx.Exec(ctx, q,
m.EventID, m.SendAt, m.Audience, m.Channel,
m.TemplateKey, m.Subject, m.Body, m.Status,
); err != nil {
return err
}
}
return tx.Commit(ctx)
}
// Get returns a single message by id, scoped to the event so a hostile
// editor on event A can't peek at event B.
func (r *MessageRepo) Get(ctx context.Context, eventID, msgID uuid.UUID) (*domain.ScheduledMessage, error) {
const q = `
SELECT id, event_id, send_at, audience, channel, template_key, subject, body,
status, sent_at, recipient_count, created_by, created_at, updated_at
FROM scheduled_messages
WHERE id = $1 AND event_id = $2
`
m, err := scanMessage(r.pool.QueryRow(ctx, q, msgID, eventID))
if err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, domain.ErrMessageNotFound
}
return nil, err
}
return m, nil
}
// ListByEvent returns every message for the event, newest first.
func (r *MessageRepo) ListByEvent(ctx context.Context, eventID uuid.UUID) ([]domain.ScheduledMessage, error) {
rows, err := r.pool.Query(ctx, `
SELECT id, event_id, send_at, audience, channel, template_key, subject, body,
status, sent_at, recipient_count, created_by, created_at, updated_at
FROM scheduled_messages
WHERE event_id = $1
ORDER BY created_at DESC
`, eventID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []domain.ScheduledMessage{}
for rows.Next() {
m, err := scanMessage(rows)
if err != nil {
return nil, err
}
out = append(out, *m)
}
return out, rows.Err()
}
type UpdateMessageParams struct {
SendAt *time.Time
Audience *domain.MessageAudience
Channel *domain.MessageChannel
Subject *string
Body *string
}
// Update patches a scheduled message. Refuses unless status='scheduled'
// or 'draft' — once a message is sending/sent/cancelled, edits are not
// safe. Returns ErrMessageNotEditable when the state forbids the edit.
func (r *MessageRepo) Update(ctx context.Context, eventID, msgID uuid.UUID, p UpdateMessageParams) (*domain.ScheduledMessage, error) {
tx, err := r.pool.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)
var status domain.MessageStatus
if err := tx.QueryRow(ctx, `
SELECT status FROM scheduled_messages
WHERE id = $1 AND event_id = $2
FOR UPDATE
`, msgID, eventID).Scan(&status); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, domain.ErrMessageNotFound
}
return nil, err
}
if status != domain.StatusScheduled && status != domain.StatusDraft {
return nil, domain.ErrMessageNotEditable
}
const upd = `
UPDATE scheduled_messages
SET send_at = COALESCE($3, send_at),
audience = COALESCE($4, audience),
channel = COALESCE($5, channel),
subject = COALESCE($6, subject),
body = COALESCE($7, body),
updated_at = now()
WHERE id = $1 AND event_id = $2
RETURNING id, event_id, send_at, audience, channel, template_key, subject, body,
status, sent_at, recipient_count, created_by, created_at, updated_at
`
m, err := scanMessage(tx.QueryRow(ctx, upd,
msgID, eventID, p.SendAt, p.Audience, p.Channel, p.Subject, p.Body,
))
if err != nil {
return nil, err
}
if err := tx.Commit(ctx); err != nil {
return nil, err
}
return m, nil
}
// Cancel marks a still-scheduled message as cancelled. A sending/sent
// message can't be cancelled.
func (r *MessageRepo) Cancel(ctx context.Context, eventID, msgID uuid.UUID) error {
tag, err := r.pool.Exec(ctx, `
UPDATE scheduled_messages
SET status = 'cancelled', updated_at = now()
WHERE id = $1 AND event_id = $2
AND status IN ('draft', 'scheduled')
`, msgID, eventID)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return domain.ErrMessageNotEditable
}
return nil
}
// PromoteToScheduled flips a draft / future-scheduled row to "send right
// now" by setting send_at and (if needed) status. Powers the
// /messages/{id}/send-now endpoint.
func (r *MessageRepo) PromoteToScheduled(ctx context.Context, eventID, msgID uuid.UUID) error {
tag, err := r.pool.Exec(ctx, `
UPDATE scheduled_messages
SET send_at = now(),
status = 'scheduled',
updated_at = now()
WHERE id = $1 AND event_id = $2
AND status IN ('draft', 'scheduled')
`, msgID, eventID)
if err != nil {
return err
}
if tag.RowsAffected() == 0 {
return domain.ErrMessageNotEditable
}
return nil
}
// ListDue returns up to `limit` messages whose send_at has passed. The
// scheduler worker calls this every poll interval.
func (r *MessageRepo) ListDue(ctx context.Context, limit int) ([]domain.ScheduledMessage, error) {
if limit <= 0 || limit > 200 {
limit = 50
}
rows, err := r.pool.Query(ctx, `
SELECT id, event_id, send_at, audience, channel, template_key, subject, body,
status, sent_at, recipient_count, created_by, created_at, updated_at
FROM scheduled_messages
WHERE status = 'scheduled' AND send_at <= now()
ORDER BY send_at ASC
LIMIT $1
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
out := []domain.ScheduledMessage{}
for rows.Next() {
m, err := scanMessage(rows)
if err != nil {
return nil, err
}
out = append(out, *m)
}
return out, rows.Err()
}
// ClaimForSending atomically transitions a single message from
// 'scheduled' to 'sending', returning whether the claim succeeded. Used
// by the worker to avoid two replicas double-sending the same message.
func (r *MessageRepo) ClaimForSending(ctx context.Context, msgID uuid.UUID) (bool, error) {
tag, err := r.pool.Exec(ctx, `
UPDATE scheduled_messages
SET status = 'sending', updated_at = now()
WHERE id = $1 AND status = 'scheduled'
`, msgID)
if err != nil {
return false, err
}
return tag.RowsAffected() == 1, nil
}
// MarkSent flips a 'sending' message to 'sent', recording how many
// recipients the worker actually addressed.
func (r *MessageRepo) MarkSent(ctx context.Context, msgID uuid.UUID, recipientCount int) error {
_, err := r.pool.Exec(ctx, `
UPDATE scheduled_messages
SET status = 'sent',
sent_at = now(),
recipient_count = $2,
updated_at = now()
WHERE id = $1
`, msgID, recipientCount)
return err
}
// MarkFailed parks a message in 'failed' state with no recipients
// dispatched. Used when the worker can't even compose the message
// (event went away, etc.).
func (r *MessageRepo) MarkFailed(ctx context.Context, msgID uuid.UUID) error {
_, err := r.pool.Exec(ctx, `
UPDATE scheduled_messages
SET status = 'failed', updated_at = now()
WHERE id = $1
`, msgID)
return err
}
// RecordDelivery upserts one per-recipient outcome.
func (r *MessageRepo) RecordDelivery(ctx context.Context, d domain.MessageDelivery) error {
_, err := r.pool.Exec(ctx, `
INSERT INTO message_deliveries (message_id, guest_id, status, sent_at, error)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (message_id, guest_id) DO UPDATE SET
status = EXCLUDED.status,
sent_at = EXCLUDED.sent_at,
error = EXCLUDED.error
`, d.MessageID, d.GuestID, d.Status, d.SentAt, d.Error)
return err
}
// DeliveryCounts returns "X of Y delivered" for the per-message line in
// the UI without pulling every delivery row.
type DeliveryStats struct {
Sent int `json:"sent"`
Failed int `json:"failed"`
Total int `json:"total"`
}
func (r *MessageRepo) DeliveryStats(ctx context.Context, msgID uuid.UUID) (DeliveryStats, error) {
var s DeliveryStats
err := r.pool.QueryRow(ctx, `
SELECT
count(*) FILTER (WHERE status = 'sent') AS sent,
count(*) FILTER (WHERE status IN ('failed','bounced','skipped')) AS failed,
count(*) AS total
FROM message_deliveries
WHERE message_id = $1
`, msgID).Scan(&s.Sent, &s.Failed, &s.Total)
return s, err
}
// MessageRecipient is one guest the audience filter resolved to for a
// given message. The notifier renders the body per recipient using
// these fields so {{guest_name}}, {{rsvp_link}}, etc. land correctly.
type MessageRecipient struct {
GuestID uuid.UUID
Name string
Email string
Phone string
TokenHash string // for link reconstruction (host pairs this with the raw token)
}
// LoadRecipients returns the audience-filtered slice of guests for a
// message. Only guests with an email (when channel includes email) or
// phone (when channel includes sms) are returned — others go on the
// skipped pile at delivery time.
func (r *MessageRepo) LoadRecipients(ctx context.Context, eventID uuid.UUID, audience domain.MessageAudience) ([]MessageRecipient, error) {
// Build the WHERE clause based on audience. We always include
// guests on the event; the audience condition adds an inner join /
// filter on rsvps.
var where string
switch audience {
case domain.AudienceAll:
where = ""
case domain.AudiencePending:
where = "AND NOT EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id)"
case domain.AudienceAttending:
where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'attending')"
case domain.AudienceDeclined:
where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'declined')"
case domain.AudienceMaybe:
where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'maybe')"
default:
return nil, domain.ErrInvalidAudience
}
q := `
SELECT g.id, g.name, COALESCE(g.email,''), COALESCE(g.phone,''),
COALESCE(t.token_hash, '')
FROM guests g
LEFT JOIN tokens t ON t.guest_id = g.id
WHERE g.event_id = $1
` + where + `
ORDER BY g.created_at ASC
`
rows, err := r.pool.Query(ctx, q, eventID)
if err != nil {
return nil, err
}
defer rows.Close()
out := []MessageRecipient{}
for rows.Next() {
var rec MessageRecipient
if err := rows.Scan(&rec.GuestID, &rec.Name, &rec.Email, &rec.Phone, &rec.TokenHash); err != nil {
return nil, err
}
out = append(out, rec)
}
return out, rows.Err()
}
// CountRecipients is the fast-path of LoadRecipients used by the UI's
// "X guests will receive this" preview before send.
func (r *MessageRepo) CountRecipients(ctx context.Context, eventID uuid.UUID, audience domain.MessageAudience) (int, error) {
recs, err := r.LoadRecipients(ctx, eventID, audience)
if err != nil {
return 0, err
}
return len(recs), nil
}
func scanMessage(s rowScanner) (*domain.ScheduledMessage, error) {
var m domain.ScheduledMessage
err := s.Scan(
&m.ID, &m.EventID, &m.SendAt, &m.Audience, &m.Channel,
&m.TemplateKey, &m.Subject, &m.Body,
&m.Status, &m.SentAt, &m.RecipientCount, &m.CreatedBy,
&m.CreatedAt, &m.UpdatedAt,
)
if err != nil {
return nil, err
}
return &m, nil
}
@@ -0,0 +1,10 @@
DROP INDEX IF EXISTS idx_deliveries_message;
DROP TABLE IF EXISTS message_deliveries;
DROP INDEX IF EXISTS idx_messages_event;
DROP INDEX IF EXISTS idx_messages_due;
DROP TABLE IF EXISTS scheduled_messages;
DROP TYPE IF EXISTS message_status;
DROP TYPE IF EXISTS message_channel;
DROP TYPE IF EXISTS message_audience;
@@ -0,0 +1,72 @@
-- Tier 2 Block F — reminders + broadcasts.
--
-- The Tier 2 plan called this the messages pipeline. Two tables:
--
-- scheduled_messages — one row per message envelope. Status moves
-- scheduled -> sending -> sent (or cancelled / failed).
-- message_deliveries — one row per recipient. Lets a partial-failure
-- batch keep the rows that did succeed and surface
-- the rest in the UI.
--
-- The scheduler worker (cmd/notifier) polls scheduled_messages by
-- send_at; the index supports that without a sequential scan even on
-- thousands of pending rows.
DO $$ BEGIN
CREATE TYPE message_audience AS ENUM ('all', 'attending', 'pending', 'declined', 'maybe');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
DO $$ BEGIN
CREATE TYPE message_channel AS ENUM ('email', 'sms', 'both');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
DO $$ BEGIN
CREATE TYPE message_status AS ENUM ('draft', 'scheduled', 'sending', 'sent', 'cancelled', 'failed');
EXCEPTION WHEN duplicate_object THEN NULL; END $$;
CREATE TABLE IF NOT EXISTS scheduled_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
event_id UUID NOT NULL REFERENCES events(id) ON DELETE CASCADE,
send_at TIMESTAMPTZ NOT NULL,
audience message_audience NOT NULL,
channel message_channel NOT NULL,
-- template_key tags the auto-seeded reminders ('reminder_7d',
-- 'reminder_1d', 'reminder_dayof', 'last_call'). NULL for hand-
-- composed broadcasts.
template_key TEXT,
subject TEXT,
body TEXT NOT NULL,
status message_status NOT NULL DEFAULT 'draft',
sent_at TIMESTAMPTZ,
recipient_count INTEGER,
-- created_by is the user who scheduled or composed the message.
-- NULL for system-seeded auto-reminders.
created_by UUID REFERENCES users(id) ON DELETE SET NULL,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
-- The scheduler's hot query: "what's due right now?" Partial index keeps
-- it small even on events with hundreds of historical sent messages.
CREATE INDEX IF NOT EXISTS idx_messages_due
ON scheduled_messages (send_at)
WHERE status = 'scheduled';
-- "Show me this event's communications history" — used by the
-- Communications tab. Sorted newest-first so the index covers ORDER BY.
CREATE INDEX IF NOT EXISTS idx_messages_event
ON scheduled_messages (event_id, created_at DESC);
CREATE TABLE IF NOT EXISTS message_deliveries (
message_id UUID NOT NULL REFERENCES scheduled_messages(id) ON DELETE CASCADE,
guest_id UUID NOT NULL REFERENCES guests(id) ON DELETE CASCADE,
status TEXT NOT NULL, -- 'pending' | 'sent' | 'bounced' | 'skipped' | 'failed'
sent_at TIMESTAMPTZ,
error TEXT,
PRIMARY KEY (message_id, guest_id)
);
-- "Which delivers succeeded for this message?" — used by the per-message
-- drill-down in the UI.
CREATE INDEX IF NOT EXISTS idx_deliveries_message
ON message_deliveries (message_id, status);