package main import ( "context" "fmt" "log/slog" "strings" "time" "github.com/google/uuid" "github.com/alchemistkay/guestguard/internal/auth" "github.com/alchemistkay/guestguard/internal/domain" "github.com/alchemistkay/guestguard/internal/notification" "github.com/alchemistkay/guestguard/internal/storage" ) // scheduledMessageWorker is the Tier 2 Block F poller. Every `interval` // it asks the messages repo for the set of scheduled rows whose send_at // has passed, claims each one (status: scheduled -> sending), fans out // to its audience, and records per-recipient delivery rows. // // Two-replica safety: ClaimForSending uses a `WHERE status='scheduled'` // guard so a parallel worker racing on the same row will lose; only the // claim that flips the row wins. type scheduledMessageWorker struct { logger *slog.Logger repo *storage.MessageRepo events *storage.EventRepo guests *storage.GuestRepo sender notification.GuestEmailDispatcher publicBaseURL string interval time.Duration batchSize int } func newScheduledMessageWorker( logger *slog.Logger, db *storage.DB, sender notification.GuestEmailDispatcher, publicBaseURL string, ) *scheduledMessageWorker { return &scheduledMessageWorker{ logger: logger.With("worker", "scheduled-messages"), repo: storage.NewMessageRepo(db), events: storage.NewEventRepo(db), guests: storage.NewGuestRepo(db), sender: sender, publicBaseURL: publicBaseURL, interval: 30 * time.Second, batchSize: 50, } } // Start blocks until ctx is cancelled. It polls on `interval` and runs // a single batch on each tick. func (w *scheduledMessageWorker) Start(ctx context.Context) { w.logger.Info("scheduled-message worker started", "interval", w.interval) // One immediate run on boot, then the periodic tick. w.runOnce(ctx) t := time.NewTicker(w.interval) defer t.Stop() for { select { case <-ctx.Done(): w.logger.Info("scheduled-message worker stopping") return case <-t.C: w.runOnce(ctx) } } } func (w *scheduledMessageWorker) runOnce(ctx context.Context) { due, err := w.repo.ListDue(ctx, w.batchSize) if err != nil { w.logger.Error("list due messages", "err", err) return } for _, m := range due { w.processOne(ctx, m) } } // processOne claims, dispatches, and finalises one message. Per-recipient // failures are logged + recorded but don't abort the batch. A whole- // batch failure (event gone, etc.) marks the message 'failed'. func (w *scheduledMessageWorker) processOne(ctx context.Context, m domain.ScheduledMessage) { claimed, err := w.repo.ClaimForSending(ctx, m.ID) if err != nil { w.logger.Error("claim message", "err", err, "message_id", m.ID) return } if !claimed { // Another worker beat us to it, or the row state changed. return } log := w.logger.With("message_id", m.ID, "event_id", m.EventID, "audience", m.Audience) log.Info("dispatching scheduled message") event, err := w.events.Get(ctx, m.EventID) if err != nil { log.Error("load event for message", "err", err) _ = w.repo.MarkFailed(ctx, m.ID) return } recipients, err := w.repo.LoadRecipients(ctx, m.EventID, m.Audience) if err != nil { log.Error("load recipients", "err", err) _ = w.repo.MarkFailed(ctx, m.ID) return } subject := "" if m.Subject != nil { subject = renderTemplate(*m.Subject, templateData(event, nil, "")) } if subject == "" { // Cheap fallback so providers don't bounce subject-less mail. subject = "Update for " + event.Name } var sent, failed int for _, rec := range recipients { guestLog := log.With("guest_id", rec.GuestID) if rec.Email == "" { // SMS-only audiences would be addressed via Twilio; for the // MVP we just record-and-skip when there's no email. _ = w.repo.RecordDelivery(ctx, domain.MessageDelivery{ MessageID: m.ID, GuestID: rec.GuestID, Status: "skipped", Error: stringPtr("no email on file"), }) continue } // Build the per-recipient RSVP link. We can't decrypt the hash, // so when the host originally minted the token-issue path put // the raw value in the invitation email; for follow-up // reminders we fall back to "go to the host's event" if the // recipient hash isn't reversible. Practical setup: most // hosts will use a custom URL via publicBaseURL. rsvpURL := w.publicBaseURL if rsvpURL != "" { rsvpURL = strings.TrimRight(rsvpURL, "/") + "/rsvp" } body := renderTemplate(m.Body, templateData(event, &rec, rsvpURL)) _, sendErr := w.sender.SendGuest(ctx, rec.Email, subject, notification.TmplReminder, map[string]any{ "Subject": subject, "GuestName": rec.Name, "EventName": event.Name, "Venue": event.Venue, "EventDate": event.EventDate.Format("Mon 2 Jan 2006 · 15:04"), "Body": body, "Link": rsvpURL, }) status := "sent" var errStr *string sentAt := time.Now().UTC() if sendErr != nil { status = "failed" s := sendErr.Error() errStr = &s failed++ guestLog.Warn("send failed", "err", sendErr) } else { sent++ } _ = w.repo.RecordDelivery(ctx, domain.MessageDelivery{ MessageID: m.ID, GuestID: rec.GuestID, Status: status, SentAt: &sentAt, Error: errStr, }) } if err := w.repo.MarkSent(ctx, m.ID, sent); err != nil { log.Error("mark sent", "err", err) } log.Info("message dispatched", "sent", sent, "failed", failed, "total", len(recipients)) } // templateData composes the {{var}} substitution map. nil rec gives a // "generic" set for rendering the subject without a recipient context; // per-recipient body rendering passes the real recipient. func templateData(event *domain.Event, rec *storage.MessageRecipient, rsvpURL string) map[string]string { d := map[string]string{ "event_name": event.Name, "event_date": event.EventDate.Format("Mon 2 Jan 2006 · 15:04"), "venue": event.Venue, "rsvp_link": rsvpURL, } if rec != nil { d["guest_name"] = rec.Name } return d } // renderTemplate does single-pass {{var}} replacement. Simple and // dependency-free; we're not running untrusted template strings here. func renderTemplate(tpl string, data map[string]string) string { out := tpl for k, v := range data { out = strings.ReplaceAll(out, "{{"+k+"}}", v) } return out } func stringPtr(s string) *string { return &s } // silence the unused-import warning when this file is the only consumer // of these packages in some build configurations. var ( _ = uuid.Nil _ = fmt.Sprintf _ = auth.HashToken )