Files
Kwaku Danso 3f8bc58ca9 feat: build core API, fraud engine, notifier, and frontend
Phase 1 — Core API (Go):
- Events, guests, tokens, RSVPs CRUD on PostgreSQL via pgx/v5
- HMAC-signed per-guest tokens with format validation
- Health endpoint with DB ping, slog JSON logging, graceful shutdown

Phase 2 — NATS + Fraud Engine:
- NATS JetStream pub/sub with explicit-ack consumers
- Python/FastAPI fraud engine with heuristic risk scoring
  (fingerprint mismatch, IP change, missing signals, repeated access)
- gRPC sync scoring with 250ms fail-open timeout
- Per-guest baseline tracking; risk bands low/medium/high/block

Phase 3 — Notifications + Frontend:
- Notification worker scaffolding (Twilio/SES stubs, retry/backoff)
- Nuxt 3 frontend with Tailwind dark theme + brand green
- Live monitor via WebSocket with auto-reconnect
- Activity history endpoint backfills monitor with RSVPs +
  scored access checks (including blocked attempts)

UX polish:
- Marketing-friendly landing page (hero mockup, how-it-works,
  features, use cases, testimonials, FAQ, final CTA)
- Animated layered card mockups on landing + new-event page
- Plus-ones stepper, RSVP status badges, filter buttons
- Friendly access-check labels (Verified/Review/Suspicious/Blocked)
- Dashboard hydration fix via ClientOnly wrapper

Infrastructure:
- docker-compose for full local dev (postgres, nats, api,
  fraud-engine, notifier, frontend)
- Multi-stage Dockerfiles, non-root UID 1000
- Integration tests with testcontainers-go

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-11 21:08:56 +01:00

90 lines
2.5 KiB
Python

from __future__ import annotations
import asyncio
import logging
import nats
from nats.aio.client import Client as NATSClient
from nats.js import JetStreamContext
from nats.js.api import ConsumerConfig, DeliverPolicy, RetentionPolicy, StorageType, StreamConfig
from nats.js.errors import NotFoundError
logger = logging.getLogger(__name__)
STREAM_SUBJECTS = ["guest.>", "fraud.>", "rsvp.>", "invitation.>"]
class NatsBus:
def __init__(self, url: str, stream_name: str) -> None:
self._url = url
self._stream_name = stream_name
self._nc: NATSClient | None = None
self._js: JetStreamContext | None = None
async def connect(self) -> None:
self._nc = await nats.connect(
self._url,
name="guestguard-fraud-engine",
max_reconnect_attempts=-1,
reconnect_time_wait=2,
)
self._js = self._nc.jetstream()
await self._ensure_stream()
logger.info("connected to nats", extra={"url": self._url})
async def _ensure_stream(self) -> None:
assert self._js is not None
try:
await self._js.stream_info(self._stream_name)
return
except NotFoundError:
pass
cfg = StreamConfig(
name=self._stream_name,
subjects=STREAM_SUBJECTS,
retention=RetentionPolicy.LIMITS,
storage=StorageType.FILE,
max_age=14 * 24 * 60 * 60 * 1_000_000_000, # 14 days, ns
)
await self._js.add_stream(config=cfg)
@property
def js(self) -> JetStreamContext:
if self._js is None:
raise RuntimeError("nats not connected")
return self._js
async def subscribe(
self,
subject: str,
durable: str,
handler,
manual_ack: bool = True,
):
cfg = ConsumerConfig(
durable_name=durable,
ack_policy="explicit",
deliver_policy=DeliverPolicy.ALL,
max_deliver=5,
ack_wait=30,
filter_subject=subject,
)
return await self.js.subscribe(
subject=subject,
durable=durable,
cb=handler,
manual_ack=manual_ack,
config=cfg,
)
async def publish(self, subject: str, payload: bytes) -> None:
await self.js.publish(subject, payload)
async def close(self) -> None:
if self._nc is not None:
await self._nc.drain()
await asyncio.sleep(0)
self._nc = None
self._js = None