from __future__ import annotations import asyncio import logging from datetime import UTC, datetime from uuid import UUID import grpc from app.schemas import AccessAttempted from app.scoring import BLOCK, HIGH, LOW, MEDIUM, HeuristicScorer, risk_band from fraud.v1 import fraud_pb2, fraud_pb2_grpc logger = logging.getLogger(__name__) _RISK_TO_PROTO = { LOW: fraud_pb2.RISK_LOW, MEDIUM: fraud_pb2.RISK_MEDIUM, HIGH: fraud_pb2.RISK_HIGH, BLOCK: fraud_pb2.RISK_BLOCK, } class FraudServicer(fraud_pb2_grpc.FraudServiceServicer): def __init__(self, scorer: HeuristicScorer) -> None: self._scorer = scorer async def Score( # noqa: N802 — gRPC method self, request: fraud_pb2.ScoreRequest, context: grpc.aio.ServicerContext, ) -> fraud_pb2.ScoreResponse: try: evt = AccessAttempted( event_id=UUID(request.event_id), guest_id=UUID(request.guest_id), token_id=UUID(request.token_id), access_log_id=UUID(request.access_log_id) if request.access_log_id else UUID(int=0), fingerprint=dict(request.fingerprint) if request.fingerprint else None, ip_address=request.ip_address or None, user_agent=request.user_agent or None, referrer=request.referrer or None, occurred_at=datetime.now(UTC), ) except (ValueError, TypeError) as exc: await context.abort(grpc.StatusCode.INVALID_ARGUMENT, f"bad request: {exc}") raise # unreachable, abort raises result = self._scorer.score(evt) band = risk_band(result.score) return fraud_pb2.ScoreResponse( score=result.score, risk=_RISK_TO_PROTO.get(band, fraud_pb2.RISK_UNSPECIFIED), reasons=result.reasons, ) async def serve_grpc(scorer: HeuristicScorer, addr: str) -> grpc.aio.Server: server = grpc.aio.server() fraud_pb2_grpc.add_FraudServiceServicer_to_server(FraudServicer(scorer), server) server.add_insecure_port(addr) await server.start() logger.info("grpc server started", extra={"addr": addr}) return server async def stop_grpc(server: grpc.aio.Server) -> None: await server.stop(grace=2.0) await asyncio.sleep(0)