Skip to content

NatsBroker

faststream.nats.broker.NatsBroker #

NatsBroker(
    servers: str
    | Sequence[str] = ("nats://localhost:4222"),
    *,
    security: Optional[BaseSecurity] = None,
    protocol: str = "nats",
    protocol_version: str | None = "custom",
    **kwargs: Any
)

Bases: NatsLoggingMixin, BrokerAsyncUsecase[Msg, Client]

A class to represent a NATS broker.

Initialize the NatsBroker object.

PARAMETER DESCRIPTION
servers

The NATS server(s) to connect to.

TYPE: Union[str, Sequence[str]] DEFAULT: ('nats://localhost:4222')

security

The security options.

TYPE: Optional[BaseSecurity] DEFAULT: None

protocol

The protocol to use.

TYPE: str DEFAULT: 'nats'

protocol_version

The protocol version to use.

TYPE: Optional[str] DEFAULT: 'custom'

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/nats/broker.py
def __init__(
    self,
    servers: Union[str, Sequence[str]] = ("nats://localhost:4222",),
    *,
    security: Optional[BaseSecurity] = None,
    protocol: str = "nats",
    protocol_version: Optional[str] = "custom",
    **kwargs: Any,
) -> None:
    """Initialize the NatsBroker object.

    Args:
        servers (Union[str, Sequence[str]]): The NATS server(s) to connect to.
        security (Optional[BaseSecurity]): The security options.
        protocol (str): The protocol to use.
        protocol_version (Optional[str]): The protocol version to use.
        **kwargs (Any): Additional keyword arguments.
    """
    kwargs.update(parse_security(security))

    if kwargs.get("tls"):  # pragma: no cover
        warnings.warn(
            (
                "\nNATS `tls` option was deprecated and will be removed in 0.4.0"
                "\nPlease, use `security` with `BaseSecurity` or `SASLPlaintext` instead"
            ),
            DeprecationWarning,
            stacklevel=2,
        )

    super().__init__(
        url=([servers] if isinstance(servers, str) else list(servers)),
        protocol=protocol,
        protocol_version=protocol_version,
        security=security,
        **kwargs,
    )

    self.__is_connected = False
    self._producer = None

    # JS options
    self.stream = None
    self._js_producer = None

dependencies instance-attribute #

dependencies: Sequence[Depends] = dependencies

description instance-attribute #

description = description

fmt property #

fmt: str

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

handlers instance-attribute #

handlers: dict[Subject, Handler]

log_level instance-attribute #

log_level: int

logger instance-attribute #

logger: Optional[Logger]

middlewares instance-attribute #

middlewares: Sequence[Callable[[MsgType], BaseMiddleware]]

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

security instance-attribute #

security = security

started instance-attribute #

started: bool = False

stream instance-attribute #

stream: JetStreamContext | None = None

tags instance-attribute #

tags = tags

url instance-attribute #

url: List[str]

close async #

close(
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None

Closes the object.

PARAMETER DESCRIPTION
exc_type

The type of the exception being handled, if any.

TYPE: Optional[Type[BaseException]] DEFAULT: None

exc_val

The exception instance being handled, if any.

TYPE: Optional[BaseException] DEFAULT: None

exec_tb

The traceback of the exception being handled, if any.

TYPE: Optional[TracebackType] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/asynchronous.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None:
    """Closes the object.

    Args:
        exc_type: The type of the exception being handled, if any.
        exc_val: The exception instance being handled, if any.
        exec_tb: The traceback of the exception being handled, if any.

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    super()._abc_close(exc_type, exc_val, exec_tb)

    for h in self.handlers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exec_tb)

connect async #

connect(*args: Any, **kwargs: Any) -> Client
Source code in faststream/nats/broker.py
async def connect(
    self,
    *args: Any,
    **kwargs: Any,
) -> Client:
    connection = await super().connect(*args, **kwargs)
    for p in self._publishers.values():
        self.__set_publisher_producer(p)
    return connection

include_router #

include_router(router: BrokerRouter[Any, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[Any, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_router(self, router: BrokerRouter[Any, MsgType]) -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for r in router._handlers:
        self.subscriber(*r.args, **r.kwargs)(r.call)

    self._publishers = {**self._publishers, **router._publishers}

include_routers #

include_routers(
    *routers: BrokerRouter[Any, MsgType]
) -> None

Includes routers in the current object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[Any, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_routers(self, *routers: BrokerRouter[Any, MsgType]) -> None:
    """Includes routers in the current object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publish async #

publish(
    *args: Any, stream: str | None = None, **kwargs: Any
) -> DecodedMessage | None
Source code in faststream/nats/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    *args: Any,
    stream: Optional[str] = None,
    **kwargs: Any,
) -> Optional[DecodedMessage]:
    if stream is None:
        assert self._producer, NOT_CONNECTED_YET  # nosec B101
        return await self._producer.publish(*args, **kwargs)
    else:
        assert self._js_producer, NOT_CONNECTED_YET  # nosec B101
        return await self._js_producer.publish(
            *args,
            stream=stream,
            **kwargs,  # type: ignore[misc]
        )

publisher #

publisher(
    subject: str,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    stream: str | JStream | None = None,
    timeout: float | None = None,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> Publisher
Source code in faststream/nats/broker.py
@override
def publisher(  # type: ignore[override]
    self,
    subject: str,
    headers: Optional[Dict[str, str]] = None,
    # Core
    reply_to: str = "",
    # JS
    stream: Union[str, JStream, None] = None,
    timeout: Optional[float] = None,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
) -> Publisher:
    if (stream := stream_builder.stream(stream)) is not None:
        stream.subjects.append(subject)

    publisher = self._publishers.get(
        subject,
        Publisher(
            subject=subject,
            headers=headers,
            # Core
            reply_to=reply_to,
            # JS
            timeout=timeout,
            stream=stream,
            # AsyncAPI
            title=title,
            _description=description,
            _schema=schema,
            include_in_schema=include_in_schema,
        ),
    )
    super().publisher(subject, publisher)
    self.__set_publisher_producer(publisher)
    return publisher

start async #

start() -> None
Source code in faststream/nats/broker.py
async def start(self) -> None:
    context.set_global(
        "default_log_context",
        self._get_log_context(None, ""),
    )

    await super().start()
    assert self._connection  # nosec B101
    assert self.stream, "Broker should be started already"  # nosec B101

    for handler in self.handlers.values():
        stream = handler.stream

        if (is_js := stream is not None) and stream.declare:
            try:  # pragma: no branch
                await self.stream.add_stream(
                    config=stream.config,
                    subjects=stream.subjects,
                )

            except nats.js.errors.BadRequestError as e:
                old_config = (await self.stream.stream_info(stream.name)).config

                c = self._get_log_context(None, "")
                if (
                    e.description
                    == "stream name already in use with a different configuration"
                ):
                    self._log(str(e), logging.WARNING, c)
                    await self.stream.update_stream(
                        config=stream.config,
                        subjects=tuple(
                            set(old_config.subjects or ()).union(stream.subjects)
                        ),
                    )

                else:  # pragma: no cover
                    self._log(str(e), logging.ERROR, c, exc_info=e)

            finally:
                # prevent from double declaration
                stream.declare = False

        c = self._get_log_context(
            None,
            subject=handler.subject,
            queue=handler.queue,
            stream=stream.name if stream else "",
        )
        self._log(f"`{handler.call_name}` waiting for messages", extra=c)
        await handler.start(self.stream if is_js else self._connection)

subscriber #

subscriber(
    subject: str,
    queue: str = "",
    pending_msgs_limit: int | None = None,
    pending_bytes_limit: int | None = None,
    max_msgs: int = 0,
    durable: str | None = None,
    config: ConsumerConfig | None = None,
    ordered_consumer: bool = False,
    idle_heartbeat: float | None = None,
    flow_control: bool = False,
    deliver_policy: DeliverPolicy | None = None,
    headers_only: bool | None = None,
    pull_sub: PullSub | None = None,
    inbox_prefix: bytes = api.INBOX_PREFIX,
    ack_first: bool = False,
    stream: str | JStream | None = None,
    dependencies: Sequence[Depends] = (),
    parser: CustomParser[Msg, NatsMessage] | None = None,
    decoder: CustomDecoder[NatsMessage] | None = None,
    middlewares: Sequence[Callable[[Msg], BaseMiddleware]]
    | None = None,
    filter: Filter[NatsMessage] = default_filter,
    no_ack: bool = False,
    max_workers: int = 1,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    **original_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        Msg, P_HandlerParams, T_HandlerReturn
    ],
]
Source code in faststream/nats/broker.py
@override
def subscriber(  # type: ignore[override]
    self,
    subject: str,
    queue: str = "",
    pending_msgs_limit: Optional[int] = None,
    pending_bytes_limit: Optional[int] = None,
    # Core arguments
    max_msgs: int = 0,
    # JS arguments
    durable: Optional[str] = None,
    config: Optional[api.ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: Optional[float] = None,
    flow_control: bool = False,
    deliver_policy: Optional[api.DeliverPolicy] = None,
    headers_only: Optional[bool] = None,
    # pull arguments
    pull_sub: Optional[PullSub] = None,
    inbox_prefix: bytes = api.INBOX_PREFIX,
    # custom
    ack_first: bool = False,
    stream: Union[str, JStream, None] = None,
    # broker arguments
    dependencies: Sequence[Depends] = (),
    parser: Optional[CustomParser[Msg, NatsMessage]] = None,
    decoder: Optional[CustomDecoder[NatsMessage]] = None,
    middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]] = None,
    filter: Filter[NatsMessage] = default_filter,
    no_ack: bool = False,
    max_workers: int = 1,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    include_in_schema: bool = True,
    **original_kwargs: Any,
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[Msg, P_HandlerParams, T_HandlerReturn],
]:
    stream = stream_builder.stream(stream)

    if pull_sub is not None and stream is None:
        raise ValueError("Pull subscriber can be used only with a stream")

    self._setup_log_context(
        queue=queue,
        subject=subject,
        stream=stream.name if stream else None,
    )
    super().subscriber()

    extra_options: AnyDict = {
        "pending_msgs_limit": pending_msgs_limit
        or (
            DEFAULT_JS_SUB_PENDING_MSGS_LIMIT
            if stream
            else DEFAULT_SUB_PENDING_MSGS_LIMIT
        ),
        "pending_bytes_limit": pending_bytes_limit
        or (
            DEFAULT_JS_SUB_PENDING_BYTES_LIMIT
            if stream
            else DEFAULT_SUB_PENDING_BYTES_LIMIT
        ),
    }

    if stream:
        extra_options.update(
            {
                "durable": durable,
                "stream": stream.name,
                "config": config,
            }
        )

        if pull_sub is not None:
            extra_options.update({"inbox_prefix": inbox_prefix})

        else:
            extra_options.update(
                {
                    "ordered_consumer": ordered_consumer,
                    "idle_heartbeat": idle_heartbeat,
                    "flow_control": flow_control,
                    "deliver_policy": deliver_policy,
                    "headers_only": headers_only,
                    "manual_ack": not ack_first,
                }
            )

    else:
        extra_options.update(
            {
                "max_msgs": max_msgs,
            }
        )

    key = Handler.get_routing_hash(subject)
    handler = self.handlers[key] = self.handlers.get(
        key,
        Handler(
            subject=subject,
            queue=queue,
            stream=stream,
            pull_sub=pull_sub,
            extra_options=extra_options,
            title=title,
            description=description,
            include_in_schema=include_in_schema,
            graceful_timeout=self.graceful_timeout,
            max_workers=max_workers,
            log_context_builder=partial(
                self._get_log_context,
                stream=stream.name if stream else "",
                subject=subject,
                queue=queue,
            ),
        ),
    )

    if stream:
        stream.subjects.append(handler.subject)

    def consumer_wrapper(
        func: Callable[P_HandlerParams, T_HandlerReturn],
    ) -> HandlerCallWrapper[
        Msg,
        P_HandlerParams,
        T_HandlerReturn,
    ]:
        handler_call, dependant = self._wrap_handler(
            func,
            extra_dependencies=dependencies,
            no_ack=no_ack,
            **original_kwargs,
        )

        handler.add_call(
            handler=handler_call,
            filter=filter,
            middlewares=middlewares,
            parser=parser or self._global_parser,
            decoder=decoder or self._global_decoder,
            dependant=dependant,
        )

        return handler_call

    return consumer_wrapper