from __future__ import annotations import asyncio import logging import nats from nats.aio.client import Client as NATSClient from nats.js import JetStreamContext from nats.js.api import ConsumerConfig, DeliverPolicy, RetentionPolicy, StorageType, StreamConfig from nats.js.errors import NotFoundError logger = logging.getLogger(__name__) STREAM_SUBJECTS = ["guest.>", "fraud.>", "rsvp.>", "invitation.>"] class NatsBus: def __init__(self, url: str, stream_name: str) -> None: self._url = url self._stream_name = stream_name self._nc: NATSClient | None = None self._js: JetStreamContext | None = None async def connect(self) -> None: self._nc = await nats.connect( self._url, name="guestguard-fraud-engine", max_reconnect_attempts=-1, reconnect_time_wait=2, ) self._js = self._nc.jetstream() await self._ensure_stream() logger.info("connected to nats", extra={"url": self._url}) async def _ensure_stream(self) -> None: assert self._js is not None try: await self._js.stream_info(self._stream_name) return except NotFoundError: pass cfg = StreamConfig( name=self._stream_name, subjects=STREAM_SUBJECTS, retention=RetentionPolicy.LIMITS, storage=StorageType.FILE, max_age=14 * 24 * 60 * 60 * 1_000_000_000, # 14 days, ns ) await self._js.add_stream(config=cfg) @property def js(self) -> JetStreamContext: if self._js is None: raise RuntimeError("nats not connected") return self._js async def subscribe( self, subject: str, durable: str, handler, manual_ack: bool = True, ): cfg = ConsumerConfig( durable_name=durable, ack_policy="explicit", deliver_policy=DeliverPolicy.ALL, max_deliver=5, ack_wait=30, filter_subject=subject, ) return await self.js.subscribe( subject=subject, durable=durable, cb=handler, manual_ack=manual_ack, config=cfg, ) async def publish(self, subject: str, payload: bytes) -> None: await self.js.publish(subject, payload) async def close(self) -> None: if self._nc is not None: await self._nc.drain() await asyncio.sleep(0) self._nc = None self._js = None