dc840bfc14
The Communications surface. Hosts can schedule custom broadcasts to a
chosen audience (everyone / attending / pending / declined / maybe),
edit or cancel anything that hasn't fired, and review delivery
outcomes. Four auto-reminders are pre-seeded on every new event:
7-day, 3-day last call, 1-day, and day-of.
Schema (migration 0012)
- scheduled_messages — one row per message envelope, with status
walking draft -> scheduled -> sending -> sent (or cancelled/failed).
Partial index on (send_at) WHERE status='scheduled' for the
scheduler poll; per-event index for the Communications tab list.
- message_deliveries — per-recipient outcomes so a partial-failure
batch doesn't lose the rows that did succeed.
Domain
- MessageAudience / MessageChannel / MessageStatus enums
- SeedAutoReminders helper that returns four canonical reminder rows
for a given event_date, skipping any whose send_at would land in
the past (events created close to the date)
Storage
- MessageRepo: Create / CreateBatch / Get / ListByEvent / Update
(locks the row and refuses unless status is draft|scheduled) /
Cancel / PromoteToScheduled (the send-now path) / ListDue /
ClaimForSending (atomic guard against two replicas double-sending) /
MarkSent / MarkFailed / RecordDelivery / DeliveryStats /
LoadRecipients (audience-filtered guest list) / CountRecipients
- EventRepo.Create now seeds auto-reminders in the same transaction
that inserts the event and its owner collaborator row
API (all editor+, except recipient-count which is viewer+)
- GET /events/{id}/messages
- GET /events/{id}/messages/recipient-count?audience=...
- POST /events/{id}/messages (draft / schedule / send-now)
- PATCH /events/{id}/messages/{message_id}
- POST /events/{id}/messages/{message_id}/send-now
- DELETE /events/{id}/messages/{message_id}
Scheduler worker (cmd/notifier)
- New file scheduler.go: polls ListDue every 30s, claims each row
atomically (ClaimForSending uses a status=scheduled guard so two
notifier replicas don't double-send), renders subject and body
per recipient with the {{guest_name}} / {{event_name}} /
{{event_date}} / {{venue}} / {{rsvp_link}} placeholders, sends via
the existing GuestEmailDispatcher (Resend > SMTP > SES > log
stub, same picker as the API), records each delivery row.
Frontend
- New CommunicationsCard.vue with compose form (audience + channel +
subject + body + send-mode radios), live "X guests will receive
this" recipient-count preview, and three sub-tabs for Scheduled /
Sent / Cancelled. Per-message Send-now and Cancel actions for
draft/scheduled rows. Friendly labels for auto-seeded reminders
("1-day reminder", "Day-of reminder") so the slugs never leak.
- New top-level tab "Communications" on the event-detail page,
between Collaborators and Branding.
Tests
- TestAutoReminderSeeding confirms a future-dated event lands the
four canonical reminders in scheduled state.
- TestComposeAndEditMessage walks draft -> patch -> send-now ->
cancel and asserts the conflict on PATCH-after-cancel.
- TestRecipientCountAudienceFilter seeds a known guest mix and
checks every audience preset returns the right count.
- Full integration suite passes (~177s).
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
406 lines
13 KiB
Go
406 lines
13 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
|
|
"github.com/alchemistkay/guestguard/internal/domain"
|
|
)
|
|
|
|
// MessageRepo owns the scheduled_messages + message_deliveries pair.
|
|
// Tier 2 Block F.
|
|
type MessageRepo struct {
|
|
pool *pgxpool.Pool
|
|
}
|
|
|
|
func NewMessageRepo(db *DB) *MessageRepo {
|
|
return &MessageRepo{pool: db.Pool}
|
|
}
|
|
|
|
type CreateMessageParams struct {
|
|
EventID uuid.UUID
|
|
SendAt time.Time
|
|
Audience domain.MessageAudience
|
|
Channel domain.MessageChannel
|
|
TemplateKey *string
|
|
Subject *string
|
|
Body string
|
|
Status domain.MessageStatus
|
|
CreatedBy *uuid.UUID
|
|
}
|
|
|
|
// Create inserts a row and returns the persisted message.
|
|
func (r *MessageRepo) Create(ctx context.Context, p CreateMessageParams) (*domain.ScheduledMessage, error) {
|
|
const q = `
|
|
INSERT INTO scheduled_messages
|
|
(event_id, send_at, audience, channel, template_key, subject, body, status, created_by)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
RETURNING id, event_id, send_at, audience, channel, template_key, subject, body,
|
|
status, sent_at, recipient_count, created_by, created_at, updated_at
|
|
`
|
|
row := r.pool.QueryRow(ctx, q,
|
|
p.EventID, p.SendAt, p.Audience, p.Channel, p.TemplateKey, p.Subject,
|
|
p.Body, p.Status, p.CreatedBy,
|
|
)
|
|
return scanMessage(row)
|
|
}
|
|
|
|
// CreateBatch inserts many messages in one transaction. Used by the
|
|
// auto-reminder seeding path on event creation.
|
|
func (r *MessageRepo) CreateBatch(ctx context.Context, msgs []domain.ScheduledMessage) error {
|
|
if len(msgs) == 0 {
|
|
return nil
|
|
}
|
|
tx, err := r.pool.Begin(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
const q = `
|
|
INSERT INTO scheduled_messages
|
|
(event_id, send_at, audience, channel, template_key, subject, body, status)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
`
|
|
for _, m := range msgs {
|
|
if _, err := tx.Exec(ctx, q,
|
|
m.EventID, m.SendAt, m.Audience, m.Channel,
|
|
m.TemplateKey, m.Subject, m.Body, m.Status,
|
|
); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return tx.Commit(ctx)
|
|
}
|
|
|
|
// Get returns a single message by id, scoped to the event so a hostile
|
|
// editor on event A can't peek at event B.
|
|
func (r *MessageRepo) Get(ctx context.Context, eventID, msgID uuid.UUID) (*domain.ScheduledMessage, error) {
|
|
const q = `
|
|
SELECT id, event_id, send_at, audience, channel, template_key, subject, body,
|
|
status, sent_at, recipient_count, created_by, created_at, updated_at
|
|
FROM scheduled_messages
|
|
WHERE id = $1 AND event_id = $2
|
|
`
|
|
m, err := scanMessage(r.pool.QueryRow(ctx, q, msgID, eventID))
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, domain.ErrMessageNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
// ListByEvent returns every message for the event, newest first.
|
|
func (r *MessageRepo) ListByEvent(ctx context.Context, eventID uuid.UUID) ([]domain.ScheduledMessage, error) {
|
|
rows, err := r.pool.Query(ctx, `
|
|
SELECT id, event_id, send_at, audience, channel, template_key, subject, body,
|
|
status, sent_at, recipient_count, created_by, created_at, updated_at
|
|
FROM scheduled_messages
|
|
WHERE event_id = $1
|
|
ORDER BY created_at DESC
|
|
`, eventID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := []domain.ScheduledMessage{}
|
|
for rows.Next() {
|
|
m, err := scanMessage(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, *m)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
type UpdateMessageParams struct {
|
|
SendAt *time.Time
|
|
Audience *domain.MessageAudience
|
|
Channel *domain.MessageChannel
|
|
Subject *string
|
|
Body *string
|
|
}
|
|
|
|
// Update patches a scheduled message. Refuses unless status='scheduled'
|
|
// or 'draft' — once a message is sending/sent/cancelled, edits are not
|
|
// safe. Returns ErrMessageNotEditable when the state forbids the edit.
|
|
func (r *MessageRepo) Update(ctx context.Context, eventID, msgID uuid.UUID, p UpdateMessageParams) (*domain.ScheduledMessage, error) {
|
|
tx, err := r.pool.Begin(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer tx.Rollback(ctx)
|
|
|
|
var status domain.MessageStatus
|
|
if err := tx.QueryRow(ctx, `
|
|
SELECT status FROM scheduled_messages
|
|
WHERE id = $1 AND event_id = $2
|
|
FOR UPDATE
|
|
`, msgID, eventID).Scan(&status); err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, domain.ErrMessageNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
if status != domain.StatusScheduled && status != domain.StatusDraft {
|
|
return nil, domain.ErrMessageNotEditable
|
|
}
|
|
|
|
const upd = `
|
|
UPDATE scheduled_messages
|
|
SET send_at = COALESCE($3, send_at),
|
|
audience = COALESCE($4, audience),
|
|
channel = COALESCE($5, channel),
|
|
subject = COALESCE($6, subject),
|
|
body = COALESCE($7, body),
|
|
updated_at = now()
|
|
WHERE id = $1 AND event_id = $2
|
|
RETURNING id, event_id, send_at, audience, channel, template_key, subject, body,
|
|
status, sent_at, recipient_count, created_by, created_at, updated_at
|
|
`
|
|
m, err := scanMessage(tx.QueryRow(ctx, upd,
|
|
msgID, eventID, p.SendAt, p.Audience, p.Channel, p.Subject, p.Body,
|
|
))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err := tx.Commit(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
return m, nil
|
|
}
|
|
|
|
// Cancel marks a still-scheduled message as cancelled. A sending/sent
|
|
// message can't be cancelled.
|
|
func (r *MessageRepo) Cancel(ctx context.Context, eventID, msgID uuid.UUID) error {
|
|
tag, err := r.pool.Exec(ctx, `
|
|
UPDATE scheduled_messages
|
|
SET status = 'cancelled', updated_at = now()
|
|
WHERE id = $1 AND event_id = $2
|
|
AND status IN ('draft', 'scheduled')
|
|
`, msgID, eventID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrMessageNotEditable
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// PromoteToScheduled flips a draft / future-scheduled row to "send right
|
|
// now" by setting send_at and (if needed) status. Powers the
|
|
// /messages/{id}/send-now endpoint.
|
|
func (r *MessageRepo) PromoteToScheduled(ctx context.Context, eventID, msgID uuid.UUID) error {
|
|
tag, err := r.pool.Exec(ctx, `
|
|
UPDATE scheduled_messages
|
|
SET send_at = now(),
|
|
status = 'scheduled',
|
|
updated_at = now()
|
|
WHERE id = $1 AND event_id = $2
|
|
AND status IN ('draft', 'scheduled')
|
|
`, msgID, eventID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if tag.RowsAffected() == 0 {
|
|
return domain.ErrMessageNotEditable
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListDue returns up to `limit` messages whose send_at has passed. The
|
|
// scheduler worker calls this every poll interval.
|
|
func (r *MessageRepo) ListDue(ctx context.Context, limit int) ([]domain.ScheduledMessage, error) {
|
|
if limit <= 0 || limit > 200 {
|
|
limit = 50
|
|
}
|
|
rows, err := r.pool.Query(ctx, `
|
|
SELECT id, event_id, send_at, audience, channel, template_key, subject, body,
|
|
status, sent_at, recipient_count, created_by, created_at, updated_at
|
|
FROM scheduled_messages
|
|
WHERE status = 'scheduled' AND send_at <= now()
|
|
ORDER BY send_at ASC
|
|
LIMIT $1
|
|
`, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := []domain.ScheduledMessage{}
|
|
for rows.Next() {
|
|
m, err := scanMessage(rows)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, *m)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// ClaimForSending atomically transitions a single message from
|
|
// 'scheduled' to 'sending', returning whether the claim succeeded. Used
|
|
// by the worker to avoid two replicas double-sending the same message.
|
|
func (r *MessageRepo) ClaimForSending(ctx context.Context, msgID uuid.UUID) (bool, error) {
|
|
tag, err := r.pool.Exec(ctx, `
|
|
UPDATE scheduled_messages
|
|
SET status = 'sending', updated_at = now()
|
|
WHERE id = $1 AND status = 'scheduled'
|
|
`, msgID)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return tag.RowsAffected() == 1, nil
|
|
}
|
|
|
|
// MarkSent flips a 'sending' message to 'sent', recording how many
|
|
// recipients the worker actually addressed.
|
|
func (r *MessageRepo) MarkSent(ctx context.Context, msgID uuid.UUID, recipientCount int) error {
|
|
_, err := r.pool.Exec(ctx, `
|
|
UPDATE scheduled_messages
|
|
SET status = 'sent',
|
|
sent_at = now(),
|
|
recipient_count = $2,
|
|
updated_at = now()
|
|
WHERE id = $1
|
|
`, msgID, recipientCount)
|
|
return err
|
|
}
|
|
|
|
// MarkFailed parks a message in 'failed' state with no recipients
|
|
// dispatched. Used when the worker can't even compose the message
|
|
// (event went away, etc.).
|
|
func (r *MessageRepo) MarkFailed(ctx context.Context, msgID uuid.UUID) error {
|
|
_, err := r.pool.Exec(ctx, `
|
|
UPDATE scheduled_messages
|
|
SET status = 'failed', updated_at = now()
|
|
WHERE id = $1
|
|
`, msgID)
|
|
return err
|
|
}
|
|
|
|
// RecordDelivery upserts one per-recipient outcome.
|
|
func (r *MessageRepo) RecordDelivery(ctx context.Context, d domain.MessageDelivery) error {
|
|
_, err := r.pool.Exec(ctx, `
|
|
INSERT INTO message_deliveries (message_id, guest_id, status, sent_at, error)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
ON CONFLICT (message_id, guest_id) DO UPDATE SET
|
|
status = EXCLUDED.status,
|
|
sent_at = EXCLUDED.sent_at,
|
|
error = EXCLUDED.error
|
|
`, d.MessageID, d.GuestID, d.Status, d.SentAt, d.Error)
|
|
return err
|
|
}
|
|
|
|
// DeliveryCounts returns "X of Y delivered" for the per-message line in
|
|
// the UI without pulling every delivery row.
|
|
type DeliveryStats struct {
|
|
Sent int `json:"sent"`
|
|
Failed int `json:"failed"`
|
|
Total int `json:"total"`
|
|
}
|
|
|
|
func (r *MessageRepo) DeliveryStats(ctx context.Context, msgID uuid.UUID) (DeliveryStats, error) {
|
|
var s DeliveryStats
|
|
err := r.pool.QueryRow(ctx, `
|
|
SELECT
|
|
count(*) FILTER (WHERE status = 'sent') AS sent,
|
|
count(*) FILTER (WHERE status IN ('failed','bounced','skipped')) AS failed,
|
|
count(*) AS total
|
|
FROM message_deliveries
|
|
WHERE message_id = $1
|
|
`, msgID).Scan(&s.Sent, &s.Failed, &s.Total)
|
|
return s, err
|
|
}
|
|
|
|
// MessageRecipient is one guest the audience filter resolved to for a
|
|
// given message. The notifier renders the body per recipient using
|
|
// these fields so {{guest_name}}, {{rsvp_link}}, etc. land correctly.
|
|
type MessageRecipient struct {
|
|
GuestID uuid.UUID
|
|
Name string
|
|
Email string
|
|
Phone string
|
|
TokenHash string // for link reconstruction (host pairs this with the raw token)
|
|
}
|
|
|
|
// LoadRecipients returns the audience-filtered slice of guests for a
|
|
// message. Only guests with an email (when channel includes email) or
|
|
// phone (when channel includes sms) are returned — others go on the
|
|
// skipped pile at delivery time.
|
|
func (r *MessageRepo) LoadRecipients(ctx context.Context, eventID uuid.UUID, audience domain.MessageAudience) ([]MessageRecipient, error) {
|
|
// Build the WHERE clause based on audience. We always include
|
|
// guests on the event; the audience condition adds an inner join /
|
|
// filter on rsvps.
|
|
var where string
|
|
switch audience {
|
|
case domain.AudienceAll:
|
|
where = ""
|
|
case domain.AudiencePending:
|
|
where = "AND NOT EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id)"
|
|
case domain.AudienceAttending:
|
|
where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'attending')"
|
|
case domain.AudienceDeclined:
|
|
where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'declined')"
|
|
case domain.AudienceMaybe:
|
|
where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'maybe')"
|
|
default:
|
|
return nil, domain.ErrInvalidAudience
|
|
}
|
|
q := `
|
|
SELECT g.id, g.name, COALESCE(g.email,''), COALESCE(g.phone,''),
|
|
COALESCE(t.token_hash, '')
|
|
FROM guests g
|
|
LEFT JOIN tokens t ON t.guest_id = g.id
|
|
WHERE g.event_id = $1
|
|
` + where + `
|
|
ORDER BY g.created_at ASC
|
|
`
|
|
rows, err := r.pool.Query(ctx, q, eventID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
out := []MessageRecipient{}
|
|
for rows.Next() {
|
|
var rec MessageRecipient
|
|
if err := rows.Scan(&rec.GuestID, &rec.Name, &rec.Email, &rec.Phone, &rec.TokenHash); err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, rec)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// CountRecipients is the fast-path of LoadRecipients used by the UI's
|
|
// "X guests will receive this" preview before send.
|
|
func (r *MessageRepo) CountRecipients(ctx context.Context, eventID uuid.UUID, audience domain.MessageAudience) (int, error) {
|
|
recs, err := r.LoadRecipients(ctx, eventID, audience)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return len(recs), nil
|
|
}
|
|
|
|
func scanMessage(s rowScanner) (*domain.ScheduledMessage, error) {
|
|
var m domain.ScheduledMessage
|
|
err := s.Scan(
|
|
&m.ID, &m.EventID, &m.SendAt, &m.Audience, &m.Channel,
|
|
&m.TemplateKey, &m.Subject, &m.Body,
|
|
&m.Status, &m.SentAt, &m.RecipientCount, &m.CreatedBy,
|
|
&m.CreatedAt, &m.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &m, nil
|
|
}
|