From dc840bfc14c84500ba50e23ae5b9840e8a7177ab Mon Sep 17 00:00:00 2001 From: Kwaku Danso <72142185+cloud-dev101@users.noreply.github.com> Date: Wed, 20 May 2026 16:56:37 +0100 Subject: [PATCH] =?UTF-8?q?feat(tier2):=20reminders=20+=20broadcasts=20pip?= =?UTF-8?q?eline=20=E2=80=94=20Block=20F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Communications surface. Hosts can schedule custom broadcasts to a chosen audience (everyone / attending / pending / declined / maybe), edit or cancel anything that hasn't fired, and review delivery outcomes. Four auto-reminders are pre-seeded on every new event: 7-day, 3-day last call, 1-day, and day-of. Schema (migration 0012) - scheduled_messages — one row per message envelope, with status walking draft -> scheduled -> sending -> sent (or cancelled/failed). Partial index on (send_at) WHERE status='scheduled' for the scheduler poll; per-event index for the Communications tab list. - message_deliveries — per-recipient outcomes so a partial-failure batch doesn't lose the rows that did succeed. Domain - MessageAudience / MessageChannel / MessageStatus enums - SeedAutoReminders helper that returns four canonical reminder rows for a given event_date, skipping any whose send_at would land in the past (events created close to the date) Storage - MessageRepo: Create / CreateBatch / Get / ListByEvent / Update (locks the row and refuses unless status is draft|scheduled) / Cancel / PromoteToScheduled (the send-now path) / ListDue / ClaimForSending (atomic guard against two replicas double-sending) / MarkSent / MarkFailed / RecordDelivery / DeliveryStats / LoadRecipients (audience-filtered guest list) / CountRecipients - EventRepo.Create now seeds auto-reminders in the same transaction that inserts the event and its owner collaborator row API (all editor+, except recipient-count which is viewer+) - GET /events/{id}/messages - GET /events/{id}/messages/recipient-count?audience=... - POST /events/{id}/messages (draft / schedule / send-now) - PATCH /events/{id}/messages/{message_id} - POST /events/{id}/messages/{message_id}/send-now - DELETE /events/{id}/messages/{message_id} Scheduler worker (cmd/notifier) - New file scheduler.go: polls ListDue every 30s, claims each row atomically (ClaimForSending uses a status=scheduled guard so two notifier replicas don't double-send), renders subject and body per recipient with the {{guest_name}} / {{event_name}} / {{event_date}} / {{venue}} / {{rsvp_link}} placeholders, sends via the existing GuestEmailDispatcher (Resend > SMTP > SES > log stub, same picker as the API), records each delivery row. Frontend - New CommunicationsCard.vue with compose form (audience + channel + subject + body + send-mode radios), live "X guests will receive this" recipient-count preview, and three sub-tabs for Scheduled / Sent / Cancelled. Per-message Send-now and Cancel actions for draft/scheduled rows. Friendly labels for auto-seeded reminders ("1-day reminder", "Day-of reminder") so the slugs never leak. - New top-level tab "Communications" on the event-detail page, between Collaborators and Branding. Tests - TestAutoReminderSeeding confirms a future-dated event lands the four canonical reminders in scheduled state. - TestComposeAndEditMessage walks draft -> patch -> send-now -> cancel and asserts the conflict on PATCH-after-cancel. - TestRecipientCountAudienceFilter seeds a known guest mix and checks every audience preset returns the right count. - Full integration suite passes (~177s). Co-Authored-By: Claude Opus 4.7 --- cmd/notifier/main.go | 6 + cmd/notifier/scheduler.go | 223 ++++++++++ frontend/components/CommunicationsCard.vue | 415 ++++++++++++++++++ frontend/pages/dashboard/events/[id].vue | 20 +- internal/api/messages.go | 316 +++++++++++++ internal/api/server.go | 23 + internal/domain/messages.go | 182 ++++++++ internal/storage/events.go | 26 ++ internal/storage/messages.go | 405 +++++++++++++++++ .../storage/migrations/0012_messages.down.sql | 10 + .../storage/migrations/0012_messages.up.sql | 72 +++ test/integration/messages_test.go | 168 +++++++ 12 files changed, 1859 insertions(+), 7 deletions(-) create mode 100644 cmd/notifier/scheduler.go create mode 100644 frontend/components/CommunicationsCard.vue create mode 100644 internal/api/messages.go create mode 100644 internal/domain/messages.go create mode 100644 internal/storage/messages.go create mode 100644 internal/storage/migrations/0012_messages.down.sql create mode 100644 internal/storage/migrations/0012_messages.up.sql create mode 100644 test/integration/messages_test.go diff --git a/cmd/notifier/main.go b/cmd/notifier/main.go index 879acd6..905e078 100644 --- a/cmd/notifier/main.go +++ b/cmd/notifier/main.go @@ -152,6 +152,12 @@ func run() error { } defer invitationCC.Stop() + // Block F — scheduled-message worker. Polls scheduled_messages + // every 30s and dispatches due rows through the same email + // pipeline as the NATS-driven flows. + scheduler := newScheduledMessageWorker(logger, db, combinedEmail, cfg.PublicBaseURL) + go scheduler.Start(rootCtx) + logger.Info("notifier started") <-rootCtx.Done() logger.Info("notifier shutting down") diff --git a/cmd/notifier/scheduler.go b/cmd/notifier/scheduler.go new file mode 100644 index 0000000..f188e2e --- /dev/null +++ b/cmd/notifier/scheduler.go @@ -0,0 +1,223 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/google/uuid" + + "github.com/alchemistkay/guestguard/internal/auth" + "github.com/alchemistkay/guestguard/internal/domain" + "github.com/alchemistkay/guestguard/internal/notification" + "github.com/alchemistkay/guestguard/internal/storage" +) + +// scheduledMessageWorker is the Tier 2 Block F poller. Every `interval` +// it asks the messages repo for the set of scheduled rows whose send_at +// has passed, claims each one (status: scheduled -> sending), fans out +// to its audience, and records per-recipient delivery rows. +// +// Two-replica safety: ClaimForSending uses a `WHERE status='scheduled'` +// guard so a parallel worker racing on the same row will lose; only the +// claim that flips the row wins. +type scheduledMessageWorker struct { + logger *slog.Logger + repo *storage.MessageRepo + events *storage.EventRepo + guests *storage.GuestRepo + sender notification.GuestEmailDispatcher + publicBaseURL string + interval time.Duration + batchSize int +} + +func newScheduledMessageWorker( + logger *slog.Logger, + db *storage.DB, + sender notification.GuestEmailDispatcher, + publicBaseURL string, +) *scheduledMessageWorker { + return &scheduledMessageWorker{ + logger: logger.With("worker", "scheduled-messages"), + repo: storage.NewMessageRepo(db), + events: storage.NewEventRepo(db), + guests: storage.NewGuestRepo(db), + sender: sender, + publicBaseURL: publicBaseURL, + interval: 30 * time.Second, + batchSize: 50, + } +} + +// Start blocks until ctx is cancelled. It polls on `interval` and runs +// a single batch on each tick. +func (w *scheduledMessageWorker) Start(ctx context.Context) { + w.logger.Info("scheduled-message worker started", "interval", w.interval) + // One immediate run on boot, then the periodic tick. + w.runOnce(ctx) + t := time.NewTicker(w.interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + w.logger.Info("scheduled-message worker stopping") + return + case <-t.C: + w.runOnce(ctx) + } + } +} + +func (w *scheduledMessageWorker) runOnce(ctx context.Context) { + due, err := w.repo.ListDue(ctx, w.batchSize) + if err != nil { + w.logger.Error("list due messages", "err", err) + return + } + for _, m := range due { + w.processOne(ctx, m) + } +} + +// processOne claims, dispatches, and finalises one message. Per-recipient +// failures are logged + recorded but don't abort the batch. A whole- +// batch failure (event gone, etc.) marks the message 'failed'. +func (w *scheduledMessageWorker) processOne(ctx context.Context, m domain.ScheduledMessage) { + claimed, err := w.repo.ClaimForSending(ctx, m.ID) + if err != nil { + w.logger.Error("claim message", "err", err, "message_id", m.ID) + return + } + if !claimed { + // Another worker beat us to it, or the row state changed. + return + } + log := w.logger.With("message_id", m.ID, "event_id", m.EventID, "audience", m.Audience) + log.Info("dispatching scheduled message") + + event, err := w.events.Get(ctx, m.EventID) + if err != nil { + log.Error("load event for message", "err", err) + _ = w.repo.MarkFailed(ctx, m.ID) + return + } + + recipients, err := w.repo.LoadRecipients(ctx, m.EventID, m.Audience) + if err != nil { + log.Error("load recipients", "err", err) + _ = w.repo.MarkFailed(ctx, m.ID) + return + } + + subject := "" + if m.Subject != nil { + subject = renderTemplate(*m.Subject, templateData(event, nil, "")) + } + if subject == "" { + // Cheap fallback so providers don't bounce subject-less mail. + subject = "Update for " + event.Name + } + + var sent, failed int + for _, rec := range recipients { + guestLog := log.With("guest_id", rec.GuestID) + if rec.Email == "" { + // SMS-only audiences would be addressed via Twilio; for the + // MVP we just record-and-skip when there's no email. + _ = w.repo.RecordDelivery(ctx, domain.MessageDelivery{ + MessageID: m.ID, + GuestID: rec.GuestID, + Status: "skipped", + Error: stringPtr("no email on file"), + }) + continue + } + + // Build the per-recipient RSVP link. We can't decrypt the hash, + // so when the host originally minted the token-issue path put + // the raw value in the invitation email; for follow-up + // reminders we fall back to "go to the host's event" if the + // recipient hash isn't reversible. Practical setup: most + // hosts will use a custom URL via publicBaseURL. + rsvpURL := w.publicBaseURL + if rsvpURL != "" { + rsvpURL = strings.TrimRight(rsvpURL, "/") + "/rsvp" + } + + body := renderTemplate(m.Body, templateData(event, &rec, rsvpURL)) + + _, sendErr := w.sender.SendGuest(ctx, rec.Email, subject, notification.TmplReminder, + map[string]any{ + "Subject": subject, + "GuestName": rec.Name, + "EventName": event.Name, + "Venue": event.Venue, + "EventDate": event.EventDate.Format("Mon 2 Jan 2006 · 15:04"), + "Body": body, + "Link": rsvpURL, + }) + status := "sent" + var errStr *string + sentAt := time.Now().UTC() + if sendErr != nil { + status = "failed" + s := sendErr.Error() + errStr = &s + failed++ + guestLog.Warn("send failed", "err", sendErr) + } else { + sent++ + } + _ = w.repo.RecordDelivery(ctx, domain.MessageDelivery{ + MessageID: m.ID, + GuestID: rec.GuestID, + Status: status, + SentAt: &sentAt, + Error: errStr, + }) + } + + if err := w.repo.MarkSent(ctx, m.ID, sent); err != nil { + log.Error("mark sent", "err", err) + } + log.Info("message dispatched", "sent", sent, "failed", failed, "total", len(recipients)) +} + +// templateData composes the {{var}} substitution map. nil rec gives a +// "generic" set for rendering the subject without a recipient context; +// per-recipient body rendering passes the real recipient. +func templateData(event *domain.Event, rec *storage.MessageRecipient, rsvpURL string) map[string]string { + d := map[string]string{ + "event_name": event.Name, + "event_date": event.EventDate.Format("Mon 2 Jan 2006 · 15:04"), + "venue": event.Venue, + "rsvp_link": rsvpURL, + } + if rec != nil { + d["guest_name"] = rec.Name + } + return d +} + +// renderTemplate does single-pass {{var}} replacement. Simple and +// dependency-free; we're not running untrusted template strings here. +func renderTemplate(tpl string, data map[string]string) string { + out := tpl + for k, v := range data { + out = strings.ReplaceAll(out, "{{"+k+"}}", v) + } + return out +} + +func stringPtr(s string) *string { return &s } + +// silence the unused-import warning when this file is the only consumer +// of these packages in some build configurations. +var ( + _ = uuid.Nil + _ = fmt.Sprintf + _ = auth.HashToken +) diff --git a/frontend/components/CommunicationsCard.vue b/frontend/components/CommunicationsCard.vue new file mode 100644 index 0000000..036e157 --- /dev/null +++ b/frontend/components/CommunicationsCard.vue @@ -0,0 +1,415 @@ + + + diff --git a/frontend/pages/dashboard/events/[id].vue b/frontend/pages/dashboard/events/[id].vue index 9f3a01e..51acfc8 100644 --- a/frontend/pages/dashboard/events/[id].vue +++ b/frontend/pages/dashboard/events/[id].vue @@ -69,8 +69,8 @@ const loading = ref(true) // (Collaborators + Branding, configured once) → Analytics (results, // checked periodically). The two action-y tabs anchor the ends; setup // clusters in the middle. -type EventTab = 'guests' | 'collaborators' | 'branding' | 'analytics' | 'gate' -const validTabs: EventTab[] = ['guests', 'collaborators', 'branding', 'analytics', 'gate'] +type EventTab = 'guests' | 'collaborators' | 'communications' | 'branding' | 'analytics' | 'gate' +const validTabs: EventTab[] = ['guests', 'collaborators', 'communications', 'branding', 'analytics', 'gate'] function tabFromHash(): EventTab { if (import.meta.client) { const h = window.location.hash.replace('#', '') as EventTab @@ -783,11 +783,12 @@ function checkLabel(band?: string): string { >