Skip to content

NatsBroker

faststream.nats.NatsBroker #

NatsBroker(servers=('nats://localhost:4222'), *, error_cb=None, disconnected_cb=None, closed_cb=None, discovered_server_cb=None, reconnected_cb=None, name=SERVICE_NAME, pedantic=False, verbose=False, allow_reconnect=True, connect_timeout=DEFAULT_CONNECT_TIMEOUT, reconnect_time_wait=DEFAULT_RECONNECT_TIME_WAIT, max_reconnect_attempts=DEFAULT_MAX_RECONNECT_ATTEMPTS, ping_interval=DEFAULT_PING_INTERVAL, max_outstanding_pings=DEFAULT_MAX_OUTSTANDING_PINGS, dont_randomize=False, flusher_queue_size=DEFAULT_MAX_FLUSHER_QUEUE_SIZE, no_echo=False, tls=None, tls_hostname=None, user=None, password=None, token=None, drain_timeout=DEFAULT_DRAIN_TIMEOUT, signature_cb=None, user_jwt_cb=None, user_credentials=None, nkeys_seed=None, nkeys_seed_str=None, inbox_prefix=DEFAULT_INBOX_PREFIX, pending_size=DEFAULT_PENDING_SIZE, flush_timeout=None, graceful_timeout=None, decoder=None, parser=None, dependencies=(), middlewares=(), security=None, asyncapi_url=None, protocol='nats', protocol_version='custom', description=None, tags=None, logger=EMPTY, log_level=INFO, log_fmt=None, apply_types=True, validate=True, _get_dependant=None, _call_decorators=())

Bases: NatsRegistrator, NatsLoggingBroker

A class to represent a NATS broker.

Initialize the NatsBroker object.

Source code in faststream/nats/broker/broker.py
def __init__(
    self,
    servers: Annotated[
        Union[str, Iterable[str]],
        Doc("NATS cluster addresses to connect."),
    ] = ("nats://localhost:4222",),
    *,
    error_cb: Annotated[
        Optional["ErrorCallback"],
        Doc("Callback to report errors."),
    ] = None,
    disconnected_cb: Annotated[
        Optional["Callback"],
        Doc("Callback to report disconnection from NATS."),
    ] = None,
    closed_cb: Annotated[
        Optional["Callback"],
        Doc("Callback to report when client stops reconnection to NATS."),
    ] = None,
    discovered_server_cb: Annotated[
        Optional["Callback"],
        Doc("Callback to report when a new server joins the cluster."),
    ] = None,
    reconnected_cb: Annotated[
        Optional["Callback"], Doc("Callback to report success reconnection.")
    ] = None,
    name: Annotated[
        Optional[str],
        Doc("Label the connection with name (shown in NATS monitoring)."),
    ] = SERVICE_NAME,
    pedantic: Annotated[
        bool,
        Doc(
            "Turn on NATS server pedantic mode that performs extra checks on the protocol. "
            "https://docs.nats.io/using-nats/developer/connecting/misc#turn-on-pedantic-mode"
        ),
    ] = False,
    verbose: Annotated[
        bool,
        Doc("Verbose mode produce more feedback about code execution."),
    ] = False,
    allow_reconnect: Annotated[
        bool,
        Doc("Whether recover connection automatically or not."),
    ] = True,
    connect_timeout: Annotated[
        int,
        Doc("Timeout in seconds to establish connection with NATS server."),
    ] = DEFAULT_CONNECT_TIMEOUT,
    reconnect_time_wait: Annotated[
        int,
        Doc("Time in seconds to wait for reestablish connection to NATS server"),
    ] = DEFAULT_RECONNECT_TIME_WAIT,
    max_reconnect_attempts: Annotated[
        int,
        Doc("Maximum attempts number to reconnect to NATS server."),
    ] = DEFAULT_MAX_RECONNECT_ATTEMPTS,
    ping_interval: Annotated[
        int,
        Doc("Interval in seconds to ping."),
    ] = DEFAULT_PING_INTERVAL,
    max_outstanding_pings: Annotated[
        int,
        Doc("Maximum number of failed pings"),
    ] = DEFAULT_MAX_OUTSTANDING_PINGS,
    dont_randomize: Annotated[
        bool,
        Doc(
            "Boolean indicating should client randomly shuffle servers list for reconnection randomness."
        ),
    ] = False,
    flusher_queue_size: Annotated[
        int, Doc("Max count of commands awaiting to be flushed to the socket")
    ] = DEFAULT_MAX_FLUSHER_QUEUE_SIZE,
    no_echo: Annotated[
        bool,
        Doc("Boolean indicating should commands be echoed."),
    ] = False,
    tls: Annotated[
        Optional["ssl.SSLContext"],
        Doc("Some SSL context to make NATS connections secure."),
    ] = None,
    tls_hostname: Annotated[
        Optional[str],
        Doc("Hostname for TLS."),
    ] = None,
    user: Annotated[
        Optional[str],
        Doc("Username for NATS auth."),
    ] = None,
    password: Annotated[
        Optional[str],
        Doc("Username password for NATS auth."),
    ] = None,
    token: Annotated[
        Optional[str],
        Doc("Auth token for NATS auth."),
    ] = None,
    drain_timeout: Annotated[
        int,
        Doc("Timeout in seconds to drain subscriptions."),
    ] = DEFAULT_DRAIN_TIMEOUT,
    signature_cb: Annotated[
        Optional["SignatureCallback"],
        Doc(
            "A callback used to sign a nonce from the server while "
            "authenticating with nkeys. The user should sign the nonce and "
            "return the base64 encoded signature."
        ),
    ] = None,
    user_jwt_cb: Annotated[
        Optional["JWTCallback"],
        Doc(
            "A callback used to fetch and return the account "
            "signed JWT for this user."
        ),
    ] = None,
    user_credentials: Annotated[
        Optional["Credentials"],
        Doc("A user credentials file or tuple of files."),
    ] = None,
    nkeys_seed: Annotated[
        Optional[str],
        Doc("Path-like object containing nkeys seed that will be used."),
    ] = None,
    nkeys_seed_str: Annotated[
        Optional[str],
        Doc("Raw nkeys seed to be used."),
    ] = None,
    inbox_prefix: Annotated[
        Union[str, bytes],
        Doc(
            "Prefix for generating unique inboxes, subjects with that prefix and NUID.ß"
        ),
    ] = DEFAULT_INBOX_PREFIX,
    pending_size: Annotated[
        int,
        Doc("Max size of the pending buffer for publishing commands."),
    ] = DEFAULT_PENDING_SIZE,
    flush_timeout: Annotated[
        Optional[float],
        Doc("Max duration to wait for a forced flush to occur."),
    ] = None,
    # broker args
    graceful_timeout: Annotated[
        Optional[float],
        Doc(
            "Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down."
        ),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Custom decoder object."),
    ] = None,
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Custom parser object."),
    ] = None,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies to apply to all broker subscribers."),
    ] = (),
    middlewares: Annotated[
        Sequence["BrokerMiddleware[Msg]"],
        Doc("Middlewares to apply to all broker publishers/subscribers."),
    ] = (),
    # AsyncAPI args
    security: Annotated[
        Optional["BaseSecurity"],
        Doc(
            "Security options to connect broker and generate AsyncAPI server security information."
        ),
    ] = None,
    asyncapi_url: Annotated[
        Union[str, Iterable[str], None],
        Doc("AsyncAPI hardcoded server addresses. Use `servers` if not specified."),
    ] = None,
    protocol: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol."),
    ] = "nats",
    protocol_version: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol version."),
    ] = "custom",
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI server description."),
    ] = None,
    tags: Annotated[
        Optional[Iterable[Union["asyncapi.Tag", "asyncapi.TagDict"]]],
        Doc("AsyncAPI server tags."),
    ] = None,
    # logging args
    logger: Annotated[
        Optional["LoggerProto"],
        Doc("User specified logger to pass into Context and log service messages."),
    ] = EMPTY,
    log_level: Annotated[
        int,
        Doc("Service messages log level."),
    ] = logging.INFO,
    log_fmt: Annotated[
        Optional[str],
        Doc("Default logger log format."),
    ] = None,
    # FastDepends args
    apply_types: Annotated[
        bool,
        Doc("Whether to use FastDepends or not."),
    ] = True,
    validate: Annotated[
        bool,
        Doc("Whether to cast types using Pydantic validation."),
    ] = True,
    _get_dependant: Annotated[
        Optional[Callable[..., Any]],
        Doc("Custom library dependant generator callback."),
    ] = None,
    _call_decorators: Annotated[
        Iterable["Decorator"],
        Doc("Any custom decorator to apply to wrapped functions."),
    ] = (),
) -> None:
    """Initialize the NatsBroker object."""
    if tls:  # pragma: no cover
        warnings.warn(
            (
                "\nNATS `tls` option was deprecated and will be removed in 0.6.0"
                "\nPlease, use `security` with `BaseSecurity` or `SASLPlaintext` instead"
            ),
            DeprecationWarning,
            stacklevel=2,
        )

    if user or password:
        warnings.warn(
            (
                "\nNATS `user` and `password` options were deprecated and will be removed in 0.6.0"
                "\nPlease, use `security` with `SASLPlaintext` instead"
            ),
            DeprecationWarning,
            stacklevel=2,
        )

    secure_kwargs = {
        "tls": tls,
        "user": user,
        "password": password,
        **parse_security(security),
    }

    servers = [servers] if isinstance(servers, str) else list(servers)

    if asyncapi_url is not None:
        if isinstance(asyncapi_url, str):
            asyncapi_url = [asyncapi_url]
        else:
            asyncapi_url = list(asyncapi_url)
    else:
        asyncapi_url = servers

    super().__init__(
        # NATS options
        servers=servers,
        name=name,
        verbose=verbose,
        allow_reconnect=allow_reconnect,
        reconnect_time_wait=reconnect_time_wait,
        max_reconnect_attempts=max_reconnect_attempts,
        no_echo=no_echo,
        pedantic=pedantic,
        inbox_prefix=inbox_prefix,
        pending_size=pending_size,
        connect_timeout=connect_timeout,
        drain_timeout=drain_timeout,
        flush_timeout=flush_timeout,
        ping_interval=ping_interval,
        max_outstanding_pings=max_outstanding_pings,
        dont_randomize=dont_randomize,
        flusher_queue_size=flusher_queue_size,
        # security
        tls_hostname=tls_hostname,
        token=token,
        user_credentials=user_credentials,
        nkeys_seed=nkeys_seed,
        nkeys_seed_str=nkeys_seed_str,
        **secure_kwargs,
        # callbacks
        error_cb=self._log_connection_broken(error_cb),
        reconnected_cb=self._log_reconnected(reconnected_cb),
        disconnected_cb=disconnected_cb,
        closed_cb=closed_cb,
        discovered_server_cb=discovered_server_cb,
        signature_cb=signature_cb,
        user_jwt_cb=user_jwt_cb,
        # Basic args
        # broker base
        graceful_timeout=graceful_timeout,
        dependencies=dependencies,
        decoder=decoder,
        parser=parser,
        middlewares=middlewares,
        # AsyncAPI
        description=description,
        asyncapi_url=asyncapi_url,
        protocol=protocol,
        protocol_version=protocol_version,
        security=security,
        tags=tags,
        # logging
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
        # FastDepends args
        apply_types=apply_types,
        validate=validate,
        _get_dependant=_get_dependant,
        _call_decorators=_call_decorators,
    )

    self.__is_connected = False
    self._producer = None

    # JS options
    self.stream = None
    self._js_producer = None
    self._kv_declarer = None
    self._os_declarer = None

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

url instance-attribute #

url

stream instance-attribute #

stream = None

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(subject='', queue='', pending_msgs_limit=None, pending_bytes_limit=None, max_msgs=0, durable=None, config=None, ordered_consumer=False, idle_heartbeat=None, flow_control=None, deliver_policy=None, headers_only=None, pull_sub=False, kv_watch=None, obj_watch=False, inbox_prefix=INBOX_PREFIX, ack_first=False, stream=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, max_workers=1, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)

Creates NATS subscriber object.

You can use it as a handler decorator @broker.subscriber(...).

Source code in faststream/nats/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    subject: Annotated[
        str,
        Doc("NATS subject to subscribe."),
    ] = "",
    queue: Annotated[
        str,
        Doc(
            "Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS "
            "server."
        ),
    ] = "",
    pending_msgs_limit: Annotated[
        Optional[int],
        Doc(
            "Limit of messages, considered by NATS server as possible to be delivered to the client without "
            "been answered. In case of NATS Core, if that limits exceeds, you will receive NATS 'Slow Consumer' "
            "error. "
            "That's literally means that your worker can't handle the whole load. In case of NATS JetStream, "
            "you will no longer receive messages until some of delivered messages will be acked in any way."
        ),
    ] = None,
    pending_bytes_limit: Annotated[
        Optional[int],
        Doc(
            "The number of bytes, considered by NATS server as possible to be delivered to the client without "
            "been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow Consumer' "
            "error."
            "That's literally means that your worker can't handle the whole load. In case of NATS JetStream, "
            "you will no longer receive messages until some of delivered messages will be acked in any way."
        ),
    ] = None,
    # Core arguments
    max_msgs: Annotated[
        int,
        Doc("Consuming messages limiter. Automatically disconnect if reached."),
    ] = 0,
    # JS arguments
    durable: Annotated[
        Optional[str],
        Doc(
            "Name of the durable consumer to which the the subscription should be bound."
        ),
    ] = None,
    config: Annotated[
        Optional["api.ConsumerConfig"],
        Doc("Configuration of JetStream consumer to be subscribed with."),
    ] = None,
    ordered_consumer: Annotated[
        bool,
        Doc("Enable ordered consumer mode."),
    ] = False,
    idle_heartbeat: Annotated[
        Optional[float],
        Doc("Enable Heartbeats for a consumer to detect failures."),
    ] = None,
    flow_control: Annotated[
        Optional[bool],
        Doc("Enable Flow Control for a consumer."),
    ] = None,
    deliver_policy: Annotated[
        Optional["api.DeliverPolicy"],
        Doc("Deliver Policy to be used for subscription."),
    ] = None,
    headers_only: Annotated[
        Optional[bool],
        Doc(
            "Should be message delivered without payload, only headers and metadata."
        ),
    ] = None,
    # pull arguments
    pull_sub: Annotated[
        Union[bool, "PullSub"],
        Doc(
            "NATS Pull consumer parameters container. "
            "Should be used with `stream` only."
        ),
    ] = False,
    kv_watch: Annotated[
        Union[str, "KvWatch", None],
        Doc("KeyValue watch parameters container."),
    ] = None,
    obj_watch: Annotated[
        Union[bool, "ObjWatch"],
        Doc("ObjecStore watch parameters container."),
    ] = False,
    inbox_prefix: Annotated[
        bytes,
        Doc(
            "Prefix for generating unique inboxes, subjects with that prefix and NUID."
        ),
    ] = api.INBOX_PREFIX,
    # custom
    ack_first: Annotated[
        bool,
        Doc("Whether to `ack` message at start of consuming or not."),
    ] = False,
    stream: Annotated[
        Union[str, "JStream", None],
        Doc("Subscribe to NATS Stream with `subject` filter."),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **nats-py** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[NatsMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        Union[
            "Filter[NatsMessage]",
            "Filter[NatsBatchMessage]",
        ],
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    max_workers: Annotated[
        int,
        Doc("Number of workers to process messages concurrently."),
    ] = 1,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPISubscriber:
    """Creates NATS subscriber object.

    You can use it as a handler decorator `@broker.subscriber(...)`.
    """
    stream = self._stream_builder.create(stream)

    subscriber = cast(
        AsyncAPISubscriber,
        super().subscriber(
            create_subscriber(
                subject=subject,
                queue=queue,
                stream=stream,
                pull_sub=PullSub.validate(pull_sub),
                kv_watch=KvWatch.validate(kv_watch),
                obj_watch=ObjWatch.validate(obj_watch),
                max_workers=max_workers,
                # extra args
                pending_msgs_limit=pending_msgs_limit,
                pending_bytes_limit=pending_bytes_limit,
                max_msgs=max_msgs,
                durable=durable,
                config=config,
                ordered_consumer=ordered_consumer,
                idle_heartbeat=idle_heartbeat,
                flow_control=flow_control,
                deliver_policy=deliver_policy,
                headers_only=headers_only,
                inbox_prefix=inbox_prefix,
                ack_first=ack_first,
                # subscriber args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_middlewares=self._middlewares,
                broker_dependencies=self._dependencies,
                # AsyncAPI
                title_=title,
                description_=description,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    if stream and subscriber.subject:
        stream.add_subject(subscriber.subject)

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(subject, *, headers=None, reply_to='', stream=None, timeout=None, middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/nats/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    subject: Annotated[
        str,
        Doc("NATS subject to send message."),
    ],
    *,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("NATS subject name to send response."),
    ] = "",
    # JS
    stream: Annotated[
        Union[str, "JStream", None],
        Doc(
            "This option validates that the target `subject` is in presented stream. "
            "Can be omitted without any effect."
        ),
    ] = None,
    timeout: Annotated[
        Optional[float],
        Doc("Timeout to send message to NATS."),
    ] = None,
    # basic args
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> "AsyncAPIPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    stream = self._stream_builder.create(stream)

    publisher = cast(
        AsyncAPIPublisher,
        super().publisher(
            publisher=AsyncAPIPublisher.create(
                subject=subject,
                headers=headers,
                # Core
                reply_to=reply_to,
                # JS
                timeout=timeout,
                stream=stream,
                # Specific
                broker_middlewares=self._middlewares,
                middlewares=middlewares,
                # AsyncAPI
                title_=title,
                description_=description,
                schema_=schema,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    if stream and publisher.subject:
        stream.add_subject(publisher.subject)

    return publisher

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)
Source code in faststream/nats/broker/registrator.py
@override
def include_router(  # type: ignore[override]
    self,
    router: "NatsRegistrator",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Sequence["BrokerMiddleware[Msg]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    sub_streams = router._stream_builder.objects.copy()

    sub_router_subjects = [sub.subject for sub in router._subscribers.values()]

    for stream in sub_streams.values():
        new_subjects = []
        for subj in stream.subjects:
            if subj in sub_router_subjects:
                new_subjects.append("".join((self.prefix, subj)))
            else:
                new_subjects.append(subj)
        stream.subjects = new_subjects

    self._stream_builder.objects.update(sub_streams)

    return super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt #

get_fmt()

Fallback method to get log format if log_fmt if not specified.

Source code in faststream/nats/broker/logging.py
def get_fmt(self) -> str:
    """Fallback method to get log format if `log_fmt` if not specified."""
    return (
        "%(asctime)s %(levelname)-8s - "
        + (f"%(stream)-{self._max_stream_len}s | " if self._max_stream_len else "")
        + (f"%(queue)-{self._max_queue_len}s | " if self._max_queue_len else "")
        + f"%(subject)-{self._max_subject_len}s | "
        + f"%(message_id)-{self.__max_msg_id_ln}s - "
        "%(message)s"
    )

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

connect async #

connect(servers=EMPTY, **kwargs)

Connect broker object to NATS cluster.

To startup subscribers too you should use broker.start() after/instead this method.

Source code in faststream/nats/broker/broker.py
@override
async def connect(  # type: ignore[override]
    self,
    servers: Annotated[
        Union[str, Iterable[str]],
        Doc("NATS cluster addresses to connect."),
    ] = EMPTY,
    **kwargs: "Unpack[NatsInitKwargs]",
) -> "Client":
    """Connect broker object to NATS cluster.

    To startup subscribers too you should use `broker.start()` after/instead this method.
    """
    if servers is not EMPTY:
        connect_kwargs: AnyDict = {
            **kwargs,
            "servers": servers,
        }
    else:
        connect_kwargs = {**kwargs}

    return await super().connect(**connect_kwargs)

start async #

start()

Connect broker to NATS cluster and startup all subscribers.

Source code in faststream/nats/broker/broker.py
async def start(self) -> None:
    """Connect broker to NATS cluster and startup all subscribers."""
    await super().start()

    assert self._connection  # nosec B101
    assert self.stream, "Broker should be started already"  # nosec B101
    assert self._producer, "Broker should be started already"  # nosec B101

    for stream in filter(
        lambda x: x.declare,
        self._stream_builder.objects.values(),
    ):
        try:
            await self.stream.add_stream(
                config=stream.config,
                subjects=stream.subjects,
            )

        except BadRequestError as e:  # noqa: PERF203
            log_context = AsyncAPISubscriber.build_log_context(
                message=None,
                subject="",
                queue="",
                stream=stream.name,
            )

            if (
                e.description
                == "stream name already in use with a different configuration"
            ):
                old_config = (await self.stream.stream_info(stream.name)).config

                self._log(str(e), logging.WARNING, log_context)
                await self.stream.update_stream(
                    config=stream.config,
                    subjects=tuple(
                        set(old_config.subjects or ()).union(stream.subjects)
                    ),
                )

            else:  # pragma: no cover
                self._log(str(e), logging.ERROR, log_context, exc_info=e)

        finally:
            # prevent from double declaration
            stream.declare = False

    # TODO: filter by already running handlers after TestClient refactor
    for handler in self._subscribers.values():
        self._log(
            f"`{handler.call_name}` waiting for messages",
            extra=handler.get_log_context(None),
        )
        await handler.start()

publish async #

publish(message, subject, headers=None, reply_to='', correlation_id=None, stream=None, timeout=None, *, rpc=False, rpc_timeout=30.0, raise_timeout=False)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

Source code in faststream/nats/broker/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    message: Annotated[
        "SendableMessage",
        Doc(
            "Message body to send. "
            "Can be any encodable object (native python types or `pydantic.BaseModel`)."
        ),
    ],
    subject: Annotated[
        str,
        Doc("NATS subject to send message."),
    ],
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("NATS subject name to send response."),
    ] = "",
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    stream: Annotated[
        Optional[str],
        Doc(
            "This option validates that the target subject is in presented stream. "
            "Can be omitted without any effect."
        ),
    ] = None,
    timeout: Annotated[
        Optional[float],
        Doc("Timeout to send message to NATS."),
    ] = None,
    *,
    rpc: Annotated[
        bool,
        Doc("Whether to wait for reply in blocking mode."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    rpc_timeout: Annotated[
        Optional[float],
        Doc("RPC reply waiting time."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method with `timeout` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = 30.0,
    raise_timeout: Annotated[
        bool,
        Doc(
            "Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. "
            "RPC request returns `None` at timeout by default."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "`request` always raises TimeoutError instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
) -> Optional["DecodedMessage"]:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
    """
    publish_kwargs = {
        "subject": subject,
        "headers": headers,
        "reply_to": reply_to,
        "rpc": rpc,
        "rpc_timeout": rpc_timeout,
        "raise_timeout": raise_timeout,
    }

    producer: Optional[ProducerProto]
    if stream is None:
        producer = self._producer
    else:
        producer = self._js_producer
        publish_kwargs.update(
            {
                "stream": stream,
                "timeout": timeout,
            }
        )

    return await super().publish(
        message,
        producer=producer,
        correlation_id=correlation_id or gen_cor_id(),
        **publish_kwargs,
    )

request async #

request(message, subject, headers=None, correlation_id=None, stream=None, timeout=0.5)
Source code in faststream/nats/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: Annotated[
        "SendableMessage",
        Doc(
            "Message body to send. "
            "Can be any encodable object (native python types or `pydantic.BaseModel`)."
        ),
    ],
    subject: Annotated[
        str,
        Doc("NATS subject to send message."),
    ],
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway."
        ),
    ] = None,
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    stream: Annotated[
        Optional[str],
        Doc(
            "This option validates that the target subject is in presented stream. "
            "Can be omitted without any effect."
        ),
    ] = None,
    timeout: Annotated[
        float,
        Doc("Timeout to send message to NATS."),
    ] = 0.5,
) -> "NatsMessage":
    publish_kwargs = {
        "subject": subject,
        "headers": headers,
        "timeout": timeout,
    }

    producer: Optional[ProducerProto]
    if stream is None:
        producer = self._producer

    else:
        producer = self._js_producer
        publish_kwargs.update({"stream": stream})

    msg: NatsMessage = await super().request(
        message,
        producer=producer,
        correlation_id=correlation_id or gen_cor_id(),
        **publish_kwargs,
    )
    return msg

setup_subscriber #

setup_subscriber(subscriber)
Source code in faststream/nats/broker/broker.py
@override
def setup_subscriber(  # type: ignore[override]
    self,
    subscriber: "AsyncAPISubscriber",
) -> None:
    connection: Union[
        Client,
        JetStreamContext,
        KVBucketDeclarer,
        OSBucketDeclarer,
        None,
    ] = None

    if getattr(subscriber, "kv_watch", None):
        connection = self._kv_declarer

    elif getattr(subscriber, "obj_watch", None):
        connection = self._os_declarer

    elif getattr(subscriber, "stream", None):
        connection = self.stream

    else:
        connection = self._connection

    return super().setup_subscriber(
        subscriber,
        connection=connection,
    )

setup_publisher #

setup_publisher(publisher)
Source code in faststream/nats/broker/broker.py
@override
def setup_publisher(  # type: ignore[override]
    self,
    publisher: "AsyncAPIPublisher",
) -> None:
    producer: Optional[ProducerProto] = None

    if publisher.stream is not None:
        if self._js_producer is not None:
            producer = self._js_producer

    elif self._producer is not None:
        producer = self._producer

    super().setup_publisher(publisher, producer=producer)

key_value async #

key_value(bucket, *, description=None, max_value_size=None, history=1, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None, republish=None, direct=None, declare=True)
Source code in faststream/nats/broker/broker.py
async def key_value(
    self,
    bucket: str,
    *,
    description: Optional[str] = None,
    max_value_size: Optional[int] = None,
    history: int = 1,
    ttl: Optional[float] = None,  # in seconds
    max_bytes: Optional[int] = None,
    storage: Optional["StorageType"] = None,
    replicas: int = 1,
    placement: Optional["Placement"] = None,
    republish: Optional["RePublish"] = None,
    direct: Optional[bool] = None,
    # custom
    declare: bool = True,
) -> "KeyValue":
    assert self._kv_declarer, "Broker should be connected already."  # nosec B101

    return await self._kv_declarer.create_key_value(
        bucket=bucket,
        description=description,
        max_value_size=max_value_size,
        history=history,
        ttl=ttl,
        max_bytes=max_bytes,
        storage=storage,
        replicas=replicas,
        placement=placement,
        republish=republish,
        direct=direct,
        declare=declare,
    )

object_storage async #

object_storage(bucket, *, description=None, ttl=None, max_bytes=None, storage=None, replicas=1, placement=None, declare=True)
Source code in faststream/nats/broker/broker.py
async def object_storage(
    self,
    bucket: str,
    *,
    description: Optional[str] = None,
    ttl: Optional[float] = None,
    max_bytes: Optional[int] = None,
    storage: Optional["StorageType"] = None,
    replicas: int = 1,
    placement: Optional["Placement"] = None,
    # custom
    declare: bool = True,
) -> "ObjectStore":
    assert self._os_declarer, "Broker should be connected already."  # nosec B101

    return await self._os_declarer.create_object_store(
        bucket=bucket,
        description=description,
        ttl=ttl,
        max_bytes=max_bytes,
        storage=storage,
        replicas=replicas,
        placement=placement,
        declare=declare,
    )

new_inbox async #

new_inbox()

Return a unique inbox that can be used for NATS requests or subscriptions.

The inbox prefix can be customised by passing inbox_prefix when creating your NatsBroker.

This method calls nats.aio.client.Client.new_inbox [1] under the hood.

[1] https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.new_inbox

Source code in faststream/nats/broker/broker.py
async def new_inbox(self) -> str:
    """Return a unique inbox that can be used for NATS requests or subscriptions.

    The inbox prefix can be customised by passing `inbox_prefix` when creating your `NatsBroker`.

    This method calls `nats.aio.client.Client.new_inbox` [1] under the hood.

    [1] https://nats-io.github.io/nats.py/modules.html#nats.aio.client.Client.new_inbox
    """
    assert self._connection  # nosec B101

    return self._connection.new_inbox()

ping async #

ping(timeout)
Source code in faststream/nats/broker/broker.py
@override
async def ping(self, timeout: Optional[float]) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        if self._connection is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            if self._connection.is_connected:
                return True

            await anyio.sleep(sleep_time)

    return False