package storage import ( "context" "errors" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/alchemistkay/guestguard/internal/domain" ) // MessageRepo owns the scheduled_messages + message_deliveries pair. // Tier 2 Block F. type MessageRepo struct { pool *pgxpool.Pool } func NewMessageRepo(db *DB) *MessageRepo { return &MessageRepo{pool: db.Pool} } type CreateMessageParams struct { EventID uuid.UUID SendAt time.Time Audience domain.MessageAudience Channel domain.MessageChannel TemplateKey *string Subject *string Body string Status domain.MessageStatus CreatedBy *uuid.UUID } // Create inserts a row and returns the persisted message. func (r *MessageRepo) Create(ctx context.Context, p CreateMessageParams) (*domain.ScheduledMessage, error) { const q = ` INSERT INTO scheduled_messages (event_id, send_at, audience, channel, template_key, subject, body, status, created_by) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id, event_id, send_at, audience, channel, template_key, subject, body, status, sent_at, recipient_count, created_by, created_at, updated_at ` row := r.pool.QueryRow(ctx, q, p.EventID, p.SendAt, p.Audience, p.Channel, p.TemplateKey, p.Subject, p.Body, p.Status, p.CreatedBy, ) return scanMessage(row) } // CreateBatch inserts many messages in one transaction. Used by the // auto-reminder seeding path on event creation. func (r *MessageRepo) CreateBatch(ctx context.Context, msgs []domain.ScheduledMessage) error { if len(msgs) == 0 { return nil } tx, err := r.pool.Begin(ctx) if err != nil { return err } defer tx.Rollback(ctx) const q = ` INSERT INTO scheduled_messages (event_id, send_at, audience, channel, template_key, subject, body, status) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ` for _, m := range msgs { if _, err := tx.Exec(ctx, q, m.EventID, m.SendAt, m.Audience, m.Channel, m.TemplateKey, m.Subject, m.Body, m.Status, ); err != nil { return err } } return tx.Commit(ctx) } // Get returns a single message by id, scoped to the event so a hostile // editor on event A can't peek at event B. func (r *MessageRepo) Get(ctx context.Context, eventID, msgID uuid.UUID) (*domain.ScheduledMessage, error) { const q = ` SELECT id, event_id, send_at, audience, channel, template_key, subject, body, status, sent_at, recipient_count, created_by, created_at, updated_at FROM scheduled_messages WHERE id = $1 AND event_id = $2 ` m, err := scanMessage(r.pool.QueryRow(ctx, q, msgID, eventID)) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, domain.ErrMessageNotFound } return nil, err } return m, nil } // ListByEvent returns every message for the event, newest first. func (r *MessageRepo) ListByEvent(ctx context.Context, eventID uuid.UUID) ([]domain.ScheduledMessage, error) { rows, err := r.pool.Query(ctx, ` SELECT id, event_id, send_at, audience, channel, template_key, subject, body, status, sent_at, recipient_count, created_by, created_at, updated_at FROM scheduled_messages WHERE event_id = $1 ORDER BY created_at DESC `, eventID) if err != nil { return nil, err } defer rows.Close() out := []domain.ScheduledMessage{} for rows.Next() { m, err := scanMessage(rows) if err != nil { return nil, err } out = append(out, *m) } return out, rows.Err() } type UpdateMessageParams struct { SendAt *time.Time Audience *domain.MessageAudience Channel *domain.MessageChannel Subject *string Body *string } // Update patches a scheduled message. Refuses unless status='scheduled' // or 'draft' — once a message is sending/sent/cancelled, edits are not // safe. Returns ErrMessageNotEditable when the state forbids the edit. func (r *MessageRepo) Update(ctx context.Context, eventID, msgID uuid.UUID, p UpdateMessageParams) (*domain.ScheduledMessage, error) { tx, err := r.pool.Begin(ctx) if err != nil { return nil, err } defer tx.Rollback(ctx) var status domain.MessageStatus if err := tx.QueryRow(ctx, ` SELECT status FROM scheduled_messages WHERE id = $1 AND event_id = $2 FOR UPDATE `, msgID, eventID).Scan(&status); err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, domain.ErrMessageNotFound } return nil, err } if status != domain.StatusScheduled && status != domain.StatusDraft { return nil, domain.ErrMessageNotEditable } const upd = ` UPDATE scheduled_messages SET send_at = COALESCE($3, send_at), audience = COALESCE($4, audience), channel = COALESCE($5, channel), subject = COALESCE($6, subject), body = COALESCE($7, body), updated_at = now() WHERE id = $1 AND event_id = $2 RETURNING id, event_id, send_at, audience, channel, template_key, subject, body, status, sent_at, recipient_count, created_by, created_at, updated_at ` m, err := scanMessage(tx.QueryRow(ctx, upd, msgID, eventID, p.SendAt, p.Audience, p.Channel, p.Subject, p.Body, )) if err != nil { return nil, err } if err := tx.Commit(ctx); err != nil { return nil, err } return m, nil } // Cancel marks a still-scheduled message as cancelled. A sending/sent // message can't be cancelled. func (r *MessageRepo) Cancel(ctx context.Context, eventID, msgID uuid.UUID) error { tag, err := r.pool.Exec(ctx, ` UPDATE scheduled_messages SET status = 'cancelled', updated_at = now() WHERE id = $1 AND event_id = $2 AND status IN ('draft', 'scheduled') `, msgID, eventID) if err != nil { return err } if tag.RowsAffected() == 0 { return domain.ErrMessageNotEditable } return nil } // PromoteToScheduled flips a draft / future-scheduled row to "send right // now" by setting send_at and (if needed) status. Powers the // /messages/{id}/send-now endpoint. func (r *MessageRepo) PromoteToScheduled(ctx context.Context, eventID, msgID uuid.UUID) error { tag, err := r.pool.Exec(ctx, ` UPDATE scheduled_messages SET send_at = now(), status = 'scheduled', updated_at = now() WHERE id = $1 AND event_id = $2 AND status IN ('draft', 'scheduled') `, msgID, eventID) if err != nil { return err } if tag.RowsAffected() == 0 { return domain.ErrMessageNotEditable } return nil } // ListDue returns up to `limit` messages whose send_at has passed. The // scheduler worker calls this every poll interval. func (r *MessageRepo) ListDue(ctx context.Context, limit int) ([]domain.ScheduledMessage, error) { if limit <= 0 || limit > 200 { limit = 50 } rows, err := r.pool.Query(ctx, ` SELECT id, event_id, send_at, audience, channel, template_key, subject, body, status, sent_at, recipient_count, created_by, created_at, updated_at FROM scheduled_messages WHERE status = 'scheduled' AND send_at <= now() ORDER BY send_at ASC LIMIT $1 `, limit) if err != nil { return nil, err } defer rows.Close() out := []domain.ScheduledMessage{} for rows.Next() { m, err := scanMessage(rows) if err != nil { return nil, err } out = append(out, *m) } return out, rows.Err() } // ClaimForSending atomically transitions a single message from // 'scheduled' to 'sending', returning whether the claim succeeded. Used // by the worker to avoid two replicas double-sending the same message. func (r *MessageRepo) ClaimForSending(ctx context.Context, msgID uuid.UUID) (bool, error) { tag, err := r.pool.Exec(ctx, ` UPDATE scheduled_messages SET status = 'sending', updated_at = now() WHERE id = $1 AND status = 'scheduled' `, msgID) if err != nil { return false, err } return tag.RowsAffected() == 1, nil } // MarkSent flips a 'sending' message to 'sent', recording how many // recipients the worker actually addressed. func (r *MessageRepo) MarkSent(ctx context.Context, msgID uuid.UUID, recipientCount int) error { _, err := r.pool.Exec(ctx, ` UPDATE scheduled_messages SET status = 'sent', sent_at = now(), recipient_count = $2, updated_at = now() WHERE id = $1 `, msgID, recipientCount) return err } // MarkFailed parks a message in 'failed' state with no recipients // dispatched. Used when the worker can't even compose the message // (event went away, etc.). func (r *MessageRepo) MarkFailed(ctx context.Context, msgID uuid.UUID) error { _, err := r.pool.Exec(ctx, ` UPDATE scheduled_messages SET status = 'failed', updated_at = now() WHERE id = $1 `, msgID) return err } // RecordDelivery upserts one per-recipient outcome. func (r *MessageRepo) RecordDelivery(ctx context.Context, d domain.MessageDelivery) error { _, err := r.pool.Exec(ctx, ` INSERT INTO message_deliveries (message_id, guest_id, status, sent_at, error) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (message_id, guest_id) DO UPDATE SET status = EXCLUDED.status, sent_at = EXCLUDED.sent_at, error = EXCLUDED.error `, d.MessageID, d.GuestID, d.Status, d.SentAt, d.Error) return err } // DeliveryCounts returns "X of Y delivered" for the per-message line in // the UI without pulling every delivery row. type DeliveryStats struct { Sent int `json:"sent"` Failed int `json:"failed"` Total int `json:"total"` } func (r *MessageRepo) DeliveryStats(ctx context.Context, msgID uuid.UUID) (DeliveryStats, error) { var s DeliveryStats err := r.pool.QueryRow(ctx, ` SELECT count(*) FILTER (WHERE status = 'sent') AS sent, count(*) FILTER (WHERE status IN ('failed','bounced','skipped')) AS failed, count(*) AS total FROM message_deliveries WHERE message_id = $1 `, msgID).Scan(&s.Sent, &s.Failed, &s.Total) return s, err } // MessageRecipient is one guest the audience filter resolved to for a // given message. The notifier renders the body per recipient using // these fields so {{guest_name}}, {{rsvp_link}}, etc. land correctly. type MessageRecipient struct { GuestID uuid.UUID Name string Email string Phone string TokenHash string // for link reconstruction (host pairs this with the raw token) } // LoadRecipients returns the audience-filtered slice of guests for a // message. Only guests with an email (when channel includes email) or // phone (when channel includes sms) are returned — others go on the // skipped pile at delivery time. func (r *MessageRepo) LoadRecipients(ctx context.Context, eventID uuid.UUID, audience domain.MessageAudience) ([]MessageRecipient, error) { // Build the WHERE clause based on audience. We always include // guests on the event; the audience condition adds an inner join / // filter on rsvps. var where string switch audience { case domain.AudienceAll: where = "" case domain.AudiencePending: where = "AND NOT EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id)" case domain.AudienceAttending: where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'attending')" case domain.AudienceDeclined: where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'declined')" case domain.AudienceMaybe: where = "AND EXISTS (SELECT 1 FROM rsvps r WHERE r.guest_id = g.id AND r.response = 'maybe')" default: return nil, domain.ErrInvalidAudience } q := ` SELECT g.id, g.name, COALESCE(g.email,''), COALESCE(g.phone,''), COALESCE(t.token_hash, '') FROM guests g LEFT JOIN tokens t ON t.guest_id = g.id WHERE g.event_id = $1 ` + where + ` ORDER BY g.created_at ASC ` rows, err := r.pool.Query(ctx, q, eventID) if err != nil { return nil, err } defer rows.Close() out := []MessageRecipient{} for rows.Next() { var rec MessageRecipient if err := rows.Scan(&rec.GuestID, &rec.Name, &rec.Email, &rec.Phone, &rec.TokenHash); err != nil { return nil, err } out = append(out, rec) } return out, rows.Err() } // CountRecipients is the fast-path of LoadRecipients used by the UI's // "X guests will receive this" preview before send. func (r *MessageRepo) CountRecipients(ctx context.Context, eventID uuid.UUID, audience domain.MessageAudience) (int, error) { recs, err := r.LoadRecipients(ctx, eventID, audience) if err != nil { return 0, err } return len(recs), nil } func scanMessage(s rowScanner) (*domain.ScheduledMessage, error) { var m domain.ScheduledMessage err := s.Scan( &m.ID, &m.EventID, &m.SendAt, &m.Audience, &m.Channel, &m.TemplateKey, &m.Subject, &m.Body, &m.Status, &m.SentAt, &m.RecipientCount, &m.CreatedBy, &m.CreatedAt, &m.UpdatedAt, ) if err != nil { return nil, err } return &m, nil }