//go:build integration package integration_test import ( "bytes" "context" "encoding/json" "fmt" "io" "log/slog" "net" "net/http" "net/http/httptest" "sync/atomic" "testing" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5/pgxpool" "github.com/nats-io/nats.go/jetstream" "github.com/testcontainers/testcontainers-go" tcpostgres "github.com/testcontainers/testcontainers-go/modules/postgres" "github.com/testcontainers/testcontainers-go/wait" "google.golang.org/grpc" "github.com/alchemistkay/guestguard/internal/api" "github.com/alchemistkay/guestguard/internal/auth" "github.com/alchemistkay/guestguard/internal/fraud" pb "github.com/alchemistkay/guestguard/internal/fraudpb" "github.com/alchemistkay/guestguard/internal/natspub" "github.com/alchemistkay/guestguard/internal/storage" ) // TestE2EHappyPath spins up real Postgres + NATS containers and an in-process // stub fraud gRPC server, then walks both the async (access → fraud.scored // → access_logs.flagged) and sync (RSVP submit) flows we manually verified // with `docker compose up`. This is the regression net for that walkthrough. func TestE2EHappyPath(t *testing.T) { if testing.Short() { t.Skip("skipping integration test in -short mode") } ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) t.Cleanup(cancel) logger := slog.New(slog.NewTextHandler(io.Discard, nil)) dsn := startPostgres(t, ctx) natsURL := startNATS(t, ctx) db, err := storage.NewDB(ctx, dsn) must(t, err, "connect db") t.Cleanup(db.Close) must(t, db.Migrate(ctx), "migrate") natsClient, err := natspub.Connect(ctx, natsURL, logger) must(t, err, "connect nats") t.Cleanup(natsClient.Close) stub := startStubFraudGRPC(t) fraudClient, err := fraud.Dial(ctx, stub.Addr, 2*time.Second, logger) must(t, err, "dial fraud") t.Cleanup(func() { _ = fraudClient.Close() }) accessLogs := storage.NewAccessLogRepo(db) sub, err := natspub.NewFraudScoredSubscriber(ctx, natsClient, "test-fraud-scored", func(ctx context.Context, evt natspub.FraudScored) error { return accessLogs.ApplyScore(ctx, storage.ApplyScoreParams{ AccessLogID: evt.AccessLogID, Score: evt.Score, Reasons: evt.Reasons, Flagged: evt.Score >= 60, }) }, logger) must(t, err, "create fraud subscriber") consumeCtx, err := sub.Start(ctx) must(t, err, "start fraud subscriber") t.Cleanup(consumeCtx.Stop) rsvpCounter := subscribeRSVPConfirmed(t, ctx, natsClient) apiSrv, err := api.NewServer(api.ServerDeps{ Logger: logger, DB: db, AccessPublisher: natsClient, RSVPPublisher: natsClient, FraudScorer: fraudClient, TokenTTL: 24 * time.Hour, JWTSecret: "test-secret-must-be-at-least-32-bytes-long-xx", JWTIssuer: "guestguard-test", AccessTokenTTL: 15 * time.Minute, RefreshTokenTTL: 24 * time.Hour, EmailVerificationTTL: 1 * time.Hour, PasswordResetTTL: 1 * time.Hour, PublicBaseURL: "http://localhost", }) must(t, err, "build api server") srv := httptest.NewServer(apiSrv.Handler()) t.Cleanup(srv.Close) hostID := insertHost(t, ctx, db.Pool) hostToken := issueHostToken(t, hostID) t.Run("async access flow flags access_logs", func(t *testing.T) { eventID := createEvent(t, srv.URL, hostToken, "Async Test", "async-test") guestID := createGuest(t, srv.URL, hostToken, eventID, "Async Guest") token := issueToken(t, srv.URL, hostToken, eventID, guestID) accessResp := getAccess(t, srv.URL, token) stub.SetNext(72, "high", []string{"fingerprint differs from baseline"}) // Simulate the fraud-engine side of the pipeline: the engine consumes // access.attempted from NATS and publishes fraud.scored back. We do // the same publish directly so we don't need the Python service in the // test. mustPublishFraudScored(t, ctx, natsClient, natspub.FraudScored{ EventID: eventID, GuestID: guestID, TokenID: accessResp.Token.ID, AccessLogID: accessResp.AccessLog, Score: 72, Risk: "high", Reasons: []string{"fingerprint differs from baseline"}, ScoredAt: time.Now().UTC(), }) waitForFlagged(t, ctx, db.Pool, accessResp.AccessLog, 72, true) }) t.Run("sync rsvp flow records rsvp and marks token used", func(t *testing.T) { eventID := createEvent(t, srv.URL, hostToken, "Sync Test", "sync-test") guestID := createGuest(t, srv.URL, hostToken, eventID, "Sync Guest") token := issueToken(t, srv.URL, hostToken, eventID, guestID) stub.SetNext(15, "low", nil) rsvpResp := submitRSVP(t, srv.URL, token, map[string]any{ "response": "attending", "plus_ones": 0, }) if rsvpResp.Blocked { t.Fatalf("expected blocked=false, got %+v", rsvpResp) } if rsvpResp.Decision.Score != 15 || rsvpResp.Decision.Risk != "low" || !rsvpResp.Decision.Used { t.Fatalf("unexpected decision: %+v", rsvpResp.Decision) } if rsvpResp.RSVP == nil || rsvpResp.RSVP.RiskScore == nil || *rsvpResp.RSVP.RiskScore != 15 { t.Fatalf("rsvp missing risk_score=15: %+v", rsvpResp.RSVP) } assertTokenUsed(t, ctx, db.Pool, guestID) waitForRSVPConfirmed(t, rsvpCounter, 1) }) t.Run("sync rsvp flow blocks when fraud score is BLOCK", func(t *testing.T) { eventID := createEvent(t, srv.URL, hostToken, "Block Test", "block-test") guestID := createGuest(t, srv.URL, hostToken, eventID, "Block Guest") token := issueToken(t, srv.URL, hostToken, eventID, guestID) stub.SetNext(95, "block", []string{"fingerprint differs from baseline", "ip address changed"}) req, _ := http.NewRequestWithContext(ctx, http.MethodPost, srv.URL+"/rsvp/"+token, bytes.NewReader([]byte(`{"response":"attending","plus_ones":0}`))) req.Header.Set("Content-Type", "application/json") resp, err := http.DefaultClient.Do(req) must(t, err, "POST /rsvp") defer resp.Body.Close() if resp.StatusCode != http.StatusForbidden { body, _ := io.ReadAll(resp.Body) t.Fatalf("expected 403 for BLOCK, got %d: %s", resp.StatusCode, body) } assertNoRSVP(t, ctx, db.Pool, guestID) assertTokenStatus(t, ctx, db.Pool, guestID, "active") }) } // --- container helpers --- func startPostgres(t *testing.T, ctx context.Context) string { t.Helper() c, err := tcpostgres.Run(ctx, "postgres:16-alpine", tcpostgres.WithDatabase("guestguard"), tcpostgres.WithUsername("guestguard"), tcpostgres.WithPassword("guestguard"), testcontainers.WithWaitStrategy( wait.ForLog("database system is ready to accept connections"). WithOccurrence(2). WithStartupTimeout(60*time.Second), ), ) must(t, err, "start postgres container") t.Cleanup(func() { _ = c.Terminate(context.Background()) }) dsn, err := c.ConnectionString(ctx, "sslmode=disable") must(t, err, "postgres connection string") return dsn } func startNATS(t *testing.T, ctx context.Context) string { t.Helper() req := testcontainers.ContainerRequest{ Image: "nats:2.10-alpine", ExposedPorts: []string{"4222/tcp"}, Cmd: []string{"-js"}, WaitingFor: wait.ForLog("Server is ready").WithStartupTimeout(60 * time.Second), } c, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: req, Started: true, }) must(t, err, "start nats container") t.Cleanup(func() { _ = c.Terminate(context.Background()) }) host, err := c.Host(ctx) must(t, err, "nats host") port, err := c.MappedPort(ctx, "4222/tcp") must(t, err, "nats port") return fmt.Sprintf("nats://%s:%s", host, port.Port()) } // --- stub fraud gRPC server --- type stubFraud struct { pb.UnimplementedFraudServiceServer Addr string server *grpc.Server score atomic.Int32 risk atomic.Value // string reasons atomic.Value // []string } func (s *stubFraud) Score(ctx context.Context, req *pb.ScoreRequest) (*pb.ScoreResponse, error) { risk := pb.Risk_RISK_LOW switch s.risk.Load().(string) { case "low": risk = pb.Risk_RISK_LOW case "medium": risk = pb.Risk_RISK_MEDIUM case "high": risk = pb.Risk_RISK_HIGH case "block": risk = pb.Risk_RISK_BLOCK } var reasons []string if r, _ := s.reasons.Load().([]string); r != nil { reasons = r } return &pb.ScoreResponse{ Score: s.score.Load(), Risk: risk, Reasons: reasons, }, nil } func (s *stubFraud) SetNext(score int, risk string, reasons []string) { s.score.Store(int32(score)) s.risk.Store(risk) s.reasons.Store(reasons) } func startStubFraudGRPC(t *testing.T) *stubFraud { t.Helper() lis, err := net.Listen("tcp", "127.0.0.1:0") must(t, err, "listen for stub fraud") s := &stubFraud{Addr: lis.Addr().String()} s.risk.Store("low") s.reasons.Store([]string(nil)) s.server = grpc.NewServer() pb.RegisterFraudServiceServer(s.server, s) go func() { _ = s.server.Serve(lis) }() t.Cleanup(s.server.Stop) return s } // --- HTTP helpers --- func createEvent(t *testing.T, base, accessToken string, name, slug string) uuid.UUID { t.Helper() body := map[string]any{ "name": name, "slug": slug, "event_date": time.Now().Add(30 * 24 * time.Hour).UTC().Format(time.RFC3339), "venue": "Integration Hall", } var out struct{ ID uuid.UUID `json:"id"` } postJSONAuthed(t, base+"/events", accessToken, body, http.StatusCreated, &out) return out.ID } func createGuest(t *testing.T, base, accessToken string, eventID uuid.UUID, name string) uuid.UUID { t.Helper() var out struct{ ID uuid.UUID `json:"id"` } postJSONAuthed(t, fmt.Sprintf("%s/events/%s/guests", base, eventID), accessToken, map[string]any{"name": name}, http.StatusCreated, &out) return out.ID } func issueToken(t *testing.T, base, accessToken string, eventID, guestID uuid.UUID) string { t.Helper() var out struct{ Token string `json:"token"` } postJSONAuthed(t, fmt.Sprintf("%s/events/%s/guests/%s/tokens", base, eventID, guestID), accessToken, nil, http.StatusCreated, &out) return out.Token } type accessResponse struct { Token *struct{ ID uuid.UUID `json:"id"` } `json:"token"` AccessLog uuid.UUID `json:"access_log_id"` } func getAccess(t *testing.T, base, token string) accessResponse { t.Helper() resp, err := http.Get(base + "/access/" + token) must(t, err, "GET /access") defer resp.Body.Close() if resp.StatusCode != http.StatusOK { body, _ := io.ReadAll(resp.Body) t.Fatalf("GET /access status=%d body=%s", resp.StatusCode, body) } var out accessResponse must(t, json.NewDecoder(resp.Body).Decode(&out), "decode access") return out } type submitRSVPResponse struct { RSVP *struct { ID uuid.UUID `json:"id"` RiskScore *int `json:"risk_score"` } `json:"rsvp"` Decision fraud.Decision `json:"fraud"` Blocked bool `json:"blocked"` } func submitRSVP(t *testing.T, base, token string, body map[string]any) submitRSVPResponse { t.Helper() var out submitRSVPResponse postJSON(t, base+"/rsvp/"+token, body, http.StatusCreated, &out) return out } func postJSON(t *testing.T, url string, body any, wantStatus int, out any) { t.Helper() postJSONAuthed(t, url, "", body, wantStatus, out) } func postJSONAuthed(t *testing.T, url, bearer string, body any, wantStatus int, out any) { t.Helper() var rdr io.Reader if body != nil { b, _ := json.Marshal(body) rdr = bytes.NewReader(b) } req, err := http.NewRequest(http.MethodPost, url, rdr) must(t, err, "build request "+url) if rdr != nil { req.Header.Set("Content-Type", "application/json") } if bearer != "" { req.Header.Set("Authorization", "Bearer "+bearer) } resp, err := http.DefaultClient.Do(req) must(t, err, "do request "+url) defer resp.Body.Close() if resp.StatusCode != wantStatus { body, _ := io.ReadAll(resp.Body) t.Fatalf("%s status=%d want=%d body=%s", url, resp.StatusCode, wantStatus, body) } if out != nil { must(t, json.NewDecoder(resp.Body).Decode(out), "decode response from "+url) } } // --- DB helpers --- func insertHost(t *testing.T, ctx context.Context, pool *pgxpool.Pool) uuid.UUID { t.Helper() var id uuid.UUID err := pool.QueryRow(ctx, `INSERT INTO users (email, name, email_verified, email_verified_at) VALUES ($1, $2, TRUE, now()) RETURNING id`, fmt.Sprintf("test-%d@guestguard.test", time.Now().UnixNano()), "Integration Host", ).Scan(&id) must(t, err, "insert host") // Default test hosts to the Business tier so existing tests that // create multiple events for one host aren't tripped up by the // free-tier limit (1 event / month). Tests that specifically exercise // the free-tier path skip this helper. grantBusinessTier(t, ctx, pool, id) return id } // grantBusinessTier inserts an active Business subscription row for the // given user so tier-enforcement middleware grants unlimited events. func grantBusinessTier(t *testing.T, ctx context.Context, pool *pgxpool.Pool, userID uuid.UUID) { t.Helper() _, err := pool.Exec(ctx, ` INSERT INTO subscriptions (user_id, stripe_customer_id, tier, status) VALUES ($1::uuid, 'cus_test_' || replace($1::uuid::text, '-', ''), 'business', 'active') `, userID.String()) must(t, err, "grant business tier") } // issueHostToken mints a Bearer access token for an existing host using the // same JWT secret/issuer the test API server was constructed with. This // lets integration tests skip the signup/verify/login dance. func issueHostToken(t *testing.T, hostID uuid.UUID) string { t.Helper() signer, err := auth.NewJWTSigner(testJWTSecret, 5*time.Minute, testJWTIssuer) must(t, err, "build jwt signer") tok, _, err := signer.Issue(hostID, time.Now()) must(t, err, "issue jwt") return tok } const ( testJWTSecret = "test-secret-must-be-at-least-32-bytes-long-xx" testJWTIssuer = "guestguard-test" ) func waitForFlagged(t *testing.T, ctx context.Context, pool *pgxpool.Pool, accessLogID uuid.UUID, wantScore int, wantFlagged bool) { t.Helper() deadline := time.Now().Add(10 * time.Second) for time.Now().Before(deadline) { var ( score *int flagged bool ) err := pool.QueryRow(ctx, `SELECT risk_score, flagged FROM access_logs WHERE id = $1`, accessLogID, ).Scan(&score, &flagged) if err == nil && score != nil && *score == wantScore && flagged == wantFlagged { return } time.Sleep(100 * time.Millisecond) } t.Fatalf("access_log %s did not reach score=%d flagged=%v within 10s", accessLogID, wantScore, wantFlagged) } func assertTokenUsed(t *testing.T, ctx context.Context, pool *pgxpool.Pool, guestID uuid.UUID) { t.Helper() var status string err := pool.QueryRow(ctx, `SELECT status FROM tokens WHERE guest_id = $1`, guestID, ).Scan(&status) must(t, err, "load token status") if status != "used" { t.Fatalf("expected token status=used for guest %s, got %s", guestID, status) } } func assertTokenStatus(t *testing.T, ctx context.Context, pool *pgxpool.Pool, guestID uuid.UUID, want string) { t.Helper() var status string err := pool.QueryRow(ctx, `SELECT status FROM tokens WHERE guest_id = $1`, guestID, ).Scan(&status) must(t, err, "load token status") if status != want { t.Fatalf("expected token status=%s for guest %s, got %s", want, guestID, status) } } func assertNoRSVP(t *testing.T, ctx context.Context, pool *pgxpool.Pool, guestID uuid.UUID) { t.Helper() var n int err := pool.QueryRow(ctx, `SELECT count(*) FROM rsvps WHERE guest_id = $1`, guestID, ).Scan(&n) must(t, err, "count rsvps") if n != 0 { t.Fatalf("expected 0 rsvps for blocked guest %s, got %d", guestID, n) } } // --- NATS helpers --- func mustPublishFraudScored(t *testing.T, ctx context.Context, c *natspub.Client, evt natspub.FraudScored) { t.Helper() body, _ := json.Marshal(evt) _, err := c.JetStream().Publish(ctx, natspub.SubjectFraudScored, body) must(t, err, "publish fraud.scored") } func subscribeRSVPConfirmed(t *testing.T, ctx context.Context, c *natspub.Client) *atomic.Int32 { t.Helper() cons, err := c.JetStream().CreateOrUpdateConsumer(ctx, natspub.StreamName, jetstream.ConsumerConfig{ Durable: "test-rsvp-confirmed", Name: "test-rsvp-confirmed", FilterSubject: natspub.SubjectRSVPConfirmed, AckPolicy: jetstream.AckExplicitPolicy, DeliverPolicy: jetstream.DeliverAllPolicy, }) must(t, err, "create rsvp consumer") var counter atomic.Int32 cc, err := cons.Consume(func(msg jetstream.Msg) { counter.Add(1) _ = msg.Ack() }) must(t, err, "consume rsvp.confirmed") t.Cleanup(cc.Stop) return &counter } func waitForRSVPConfirmed(t *testing.T, counter *atomic.Int32, want int32) { t.Helper() deadline := time.Now().Add(5 * time.Second) for time.Now().Before(deadline) { if counter.Load() >= want { return } time.Sleep(50 * time.Millisecond) } t.Fatalf("expected %d rsvp.confirmed events, saw %d", want, counter.Load()) } // --- misc --- func must(t *testing.T, err error, op string) { t.Helper() if err != nil { t.Fatalf("%s: %v", op, err) } }