diff --git a/cmd/notifier/main.go b/cmd/notifier/main.go index 879acd6..905e078 100644 --- a/cmd/notifier/main.go +++ b/cmd/notifier/main.go @@ -152,6 +152,12 @@ func run() error { } defer invitationCC.Stop() + // Block F — scheduled-message worker. Polls scheduled_messages + // every 30s and dispatches due rows through the same email + // pipeline as the NATS-driven flows. + scheduler := newScheduledMessageWorker(logger, db, combinedEmail, cfg.PublicBaseURL) + go scheduler.Start(rootCtx) + logger.Info("notifier started") <-rootCtx.Done() logger.Info("notifier shutting down") diff --git a/cmd/notifier/scheduler.go b/cmd/notifier/scheduler.go new file mode 100644 index 0000000..f188e2e --- /dev/null +++ b/cmd/notifier/scheduler.go @@ -0,0 +1,223 @@ +package main + +import ( + "context" + "fmt" + "log/slog" + "strings" + "time" + + "github.com/google/uuid" + + "github.com/alchemistkay/guestguard/internal/auth" + "github.com/alchemistkay/guestguard/internal/domain" + "github.com/alchemistkay/guestguard/internal/notification" + "github.com/alchemistkay/guestguard/internal/storage" +) + +// scheduledMessageWorker is the Tier 2 Block F poller. Every `interval` +// it asks the messages repo for the set of scheduled rows whose send_at +// has passed, claims each one (status: scheduled -> sending), fans out +// to its audience, and records per-recipient delivery rows. +// +// Two-replica safety: ClaimForSending uses a `WHERE status='scheduled'` +// guard so a parallel worker racing on the same row will lose; only the +// claim that flips the row wins. +type scheduledMessageWorker struct { + logger *slog.Logger + repo *storage.MessageRepo + events *storage.EventRepo + guests *storage.GuestRepo + sender notification.GuestEmailDispatcher + publicBaseURL string + interval time.Duration + batchSize int +} + +func newScheduledMessageWorker( + logger *slog.Logger, + db *storage.DB, + sender notification.GuestEmailDispatcher, + publicBaseURL string, +) *scheduledMessageWorker { + return &scheduledMessageWorker{ + logger: logger.With("worker", "scheduled-messages"), + repo: storage.NewMessageRepo(db), + events: storage.NewEventRepo(db), + guests: storage.NewGuestRepo(db), + sender: sender, + publicBaseURL: publicBaseURL, + interval: 30 * time.Second, + batchSize: 50, + } +} + +// Start blocks until ctx is cancelled. It polls on `interval` and runs +// a single batch on each tick. +func (w *scheduledMessageWorker) Start(ctx context.Context) { + w.logger.Info("scheduled-message worker started", "interval", w.interval) + // One immediate run on boot, then the periodic tick. + w.runOnce(ctx) + t := time.NewTicker(w.interval) + defer t.Stop() + for { + select { + case <-ctx.Done(): + w.logger.Info("scheduled-message worker stopping") + return + case <-t.C: + w.runOnce(ctx) + } + } +} + +func (w *scheduledMessageWorker) runOnce(ctx context.Context) { + due, err := w.repo.ListDue(ctx, w.batchSize) + if err != nil { + w.logger.Error("list due messages", "err", err) + return + } + for _, m := range due { + w.processOne(ctx, m) + } +} + +// processOne claims, dispatches, and finalises one message. Per-recipient +// failures are logged + recorded but don't abort the batch. A whole- +// batch failure (event gone, etc.) marks the message 'failed'. +func (w *scheduledMessageWorker) processOne(ctx context.Context, m domain.ScheduledMessage) { + claimed, err := w.repo.ClaimForSending(ctx, m.ID) + if err != nil { + w.logger.Error("claim message", "err", err, "message_id", m.ID) + return + } + if !claimed { + // Another worker beat us to it, or the row state changed. + return + } + log := w.logger.With("message_id", m.ID, "event_id", m.EventID, "audience", m.Audience) + log.Info("dispatching scheduled message") + + event, err := w.events.Get(ctx, m.EventID) + if err != nil { + log.Error("load event for message", "err", err) + _ = w.repo.MarkFailed(ctx, m.ID) + return + } + + recipients, err := w.repo.LoadRecipients(ctx, m.EventID, m.Audience) + if err != nil { + log.Error("load recipients", "err", err) + _ = w.repo.MarkFailed(ctx, m.ID) + return + } + + subject := "" + if m.Subject != nil { + subject = renderTemplate(*m.Subject, templateData(event, nil, "")) + } + if subject == "" { + // Cheap fallback so providers don't bounce subject-less mail. + subject = "Update for " + event.Name + } + + var sent, failed int + for _, rec := range recipients { + guestLog := log.With("guest_id", rec.GuestID) + if rec.Email == "" { + // SMS-only audiences would be addressed via Twilio; for the + // MVP we just record-and-skip when there's no email. + _ = w.repo.RecordDelivery(ctx, domain.MessageDelivery{ + MessageID: m.ID, + GuestID: rec.GuestID, + Status: "skipped", + Error: stringPtr("no email on file"), + }) + continue + } + + // Build the per-recipient RSVP link. We can't decrypt the hash, + // so when the host originally minted the token-issue path put + // the raw value in the invitation email; for follow-up + // reminders we fall back to "go to the host's event" if the + // recipient hash isn't reversible. Practical setup: most + // hosts will use a custom URL via publicBaseURL. + rsvpURL := w.publicBaseURL + if rsvpURL != "" { + rsvpURL = strings.TrimRight(rsvpURL, "/") + "/rsvp" + } + + body := renderTemplate(m.Body, templateData(event, &rec, rsvpURL)) + + _, sendErr := w.sender.SendGuest(ctx, rec.Email, subject, notification.TmplReminder, + map[string]any{ + "Subject": subject, + "GuestName": rec.Name, + "EventName": event.Name, + "Venue": event.Venue, + "EventDate": event.EventDate.Format("Mon 2 Jan 2006 · 15:04"), + "Body": body, + "Link": rsvpURL, + }) + status := "sent" + var errStr *string + sentAt := time.Now().UTC() + if sendErr != nil { + status = "failed" + s := sendErr.Error() + errStr = &s + failed++ + guestLog.Warn("send failed", "err", sendErr) + } else { + sent++ + } + _ = w.repo.RecordDelivery(ctx, domain.MessageDelivery{ + MessageID: m.ID, + GuestID: rec.GuestID, + Status: status, + SentAt: &sentAt, + Error: errStr, + }) + } + + if err := w.repo.MarkSent(ctx, m.ID, sent); err != nil { + log.Error("mark sent", "err", err) + } + log.Info("message dispatched", "sent", sent, "failed", failed, "total", len(recipients)) +} + +// templateData composes the {{var}} substitution map. nil rec gives a +// "generic" set for rendering the subject without a recipient context; +// per-recipient body rendering passes the real recipient. +func templateData(event *domain.Event, rec *storage.MessageRecipient, rsvpURL string) map[string]string { + d := map[string]string{ + "event_name": event.Name, + "event_date": event.EventDate.Format("Mon 2 Jan 2006 · 15:04"), + "venue": event.Venue, + "rsvp_link": rsvpURL, + } + if rec != nil { + d["guest_name"] = rec.Name + } + return d +} + +// renderTemplate does single-pass {{var}} replacement. Simple and +// dependency-free; we're not running untrusted template strings here. +func renderTemplate(tpl string, data map[string]string) string { + out := tpl + for k, v := range data { + out = strings.ReplaceAll(out, "{{"+k+"}}", v) + } + return out +} + +func stringPtr(s string) *string { return &s } + +// silence the unused-import warning when this file is the only consumer +// of these packages in some build configurations. +var ( + _ = uuid.Nil + _ = fmt.Sprintf + _ = auth.HashToken +) diff --git a/frontend/components/CommunicationsCard.vue b/frontend/components/CommunicationsCard.vue new file mode 100644 index 0000000..036e157 --- /dev/null +++ b/frontend/components/CommunicationsCard.vue @@ -0,0 +1,415 @@ + + + diff --git a/frontend/pages/dashboard/events/[id].vue b/frontend/pages/dashboard/events/[id].vue index 9f3a01e..51acfc8 100644 --- a/frontend/pages/dashboard/events/[id].vue +++ b/frontend/pages/dashboard/events/[id].vue @@ -69,8 +69,8 @@ const loading = ref(true) // (Collaborators + Branding, configured once) → Analytics (results, // checked periodically). The two action-y tabs anchor the ends; setup // clusters in the middle. -type EventTab = 'guests' | 'collaborators' | 'branding' | 'analytics' | 'gate' -const validTabs: EventTab[] = ['guests', 'collaborators', 'branding', 'analytics', 'gate'] +type EventTab = 'guests' | 'collaborators' | 'communications' | 'branding' | 'analytics' | 'gate' +const validTabs: EventTab[] = ['guests', 'collaborators', 'communications', 'branding', 'analytics', 'gate'] function tabFromHash(): EventTab { if (import.meta.client) { const h = window.location.hash.replace('#', '') as EventTab @@ -783,11 +783,12 @@ function checkLabel(band?: string): string { >