Skip to content

NatsRouter

faststream.nats.NatsRouter #

NatsRouter(prefix='', handlers=(), *, dependencies=(), middlewares=(), parser=None, decoder=None, include_in_schema=None)

Bases: NatsRegistrator, BrokerRouter['Msg']

Includable to NatsBroker router.

Source code in faststream/nats/router.py
def __init__(
    self,
    prefix: Annotated[
        str,
        Doc("String prefix to add to all subscribers subjects."),
    ] = "",
    handlers: Annotated[
        Iterable[NatsRoute],
        Doc("Route object to include."),
    ] = (),
    *,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc(
            "Dependencies list (`[Depends(),]`) to apply to all routers' publishers/subscribers."
        ),
    ] = (),
    middlewares: Annotated[
        Iterable["BrokerMiddleware[Msg]"],
        Doc("Router middlewares to apply to all routers' publishers/subscribers."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **IncomingMessage** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    include_in_schema: Annotated[
        Optional[bool],
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = None,
) -> None:
    super().__init__(
        handlers=handlers,
        # basic args
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        parser=parser,
        decoder=decoder,
        include_in_schema=include_in_schema,
    )

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(subject='', queue='', pending_msgs_limit=None, pending_bytes_limit=None, max_msgs=0, durable=None, config=None, ordered_consumer=False, idle_heartbeat=None, flow_control=False, deliver_policy=None, headers_only=None, pull_sub=False, kv_watch=None, obj_watch=False, inbox_prefix=api.INBOX_PREFIX, ack_first=False, stream=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, max_workers=1, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)

Creates NATS subscriber object.

You can use it as a handler decorator @broker.subscriber(...).

Source code in faststream/nats/broker/registrator.py
@override
def subscriber(  # type: ignore[override]
    self,
    subject: Annotated[
        str,
        Doc("NATS subject to subscribe."),
    ] = "",
    queue: Annotated[
        str,
        Doc(
            "Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS "
            "server."
        ),
    ] = "",
    pending_msgs_limit: Annotated[
        Optional[int],
        Doc(
            "Limit of messages, considered by NATS server as possible to be delivered to the client without "
            "been answered. In case of NATS Core, if that limits exceeds, you will receive NATS 'Slow Consumer' "
            "error. "
            "That's literally means that your worker can't handle the whole load. In case of NATS JetStream, "
            "you will no longer receive messages until some of delivered messages will be acked in any way."
        ),
    ] = None,
    pending_bytes_limit: Annotated[
        Optional[int],
        Doc(
            "The number of bytes, considered by NATS server as possible to be delivered to the client without "
            "been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow Consumer' "
            "error."
            "That's literally means that your worker can't handle the whole load. In case of NATS JetStream, "
            "you will no longer receive messages until some of delivered messages will be acked in any way."
        ),
    ] = None,
    # Core arguments
    max_msgs: Annotated[
        int,
        Doc("Consuming messages limiter. Automatically disconnect if reached."),
    ] = 0,
    # JS arguments
    durable: Annotated[
        Optional[str],
        Doc(
            "Name of the durable consumer to which the the subscription should be bound."
        ),
    ] = None,
    config: Annotated[
        Optional["api.ConsumerConfig"],
        Doc("Configuration of JetStream consumer to be subscribed with."),
    ] = None,
    ordered_consumer: Annotated[
        bool,
        Doc("Enable ordered consumer mode."),
    ] = False,
    idle_heartbeat: Annotated[
        Optional[float],
        Doc("Enable Heartbeats for a consumer to detect failures."),
    ] = None,
    flow_control: Annotated[
        bool,
        Doc("Enable Flow Control for a consumer."),
    ] = False,
    deliver_policy: Annotated[
        Optional["api.DeliverPolicy"],
        Doc("Deliver Policy to be used for subscription."),
    ] = None,
    headers_only: Annotated[
        Optional[bool],
        Doc(
            "Should be message delivered without payload, only headers and metadata."
        ),
    ] = None,
    # pull arguments
    pull_sub: Annotated[
        Union[bool, "PullSub"],
        Doc(
            "NATS Pull consumer parameters container. "
            "Should be used with `stream` only."
        ),
    ] = False,
    kv_watch: Annotated[
        Union[str, "KvWatch", None],
        Doc("KeyValue watch parameters container."),
    ] = None,
    obj_watch: Annotated[
        Union[bool, "ObjWatch"],
        Doc("ObjecStore watch parameters container."),
    ] = False,
    inbox_prefix: Annotated[
        bytes,
        Doc(
            "Prefix for generating unique inboxes, subjects with that prefix and NUID."
        ),
    ] = api.INBOX_PREFIX,
    # custom
    ack_first: Annotated[
        bool,
        Doc("Whether to `ack` message at start of consuming or not."),
    ] = False,
    stream: Annotated[
        Union[str, "JStream", None],
        Doc("Subscribe to NATS Stream with `subject` filter."),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **nats-py** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Iterable["SubscriberMiddleware[NatsMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        Union[
            "Filter[NatsMessage]",
            "Filter[NatsBatchMessage]",
        ],
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    max_workers: Annotated[
        int,
        Doc("Number of workers to process messages concurrently."),
    ] = 1,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> AsyncAPISubscriber:
    """Creates NATS subscriber object.

    You can use it as a handler decorator `@broker.subscriber(...)`.
    """
    stream = self._stream_builder.create(stream)

    subscriber = cast(
        AsyncAPISubscriber,
        super().subscriber(
            create_subscriber(
                subject=subject,
                queue=queue,
                stream=stream,
                pull_sub=PullSub.validate(pull_sub),
                kv_watch=KvWatch.validate(kv_watch),
                obj_watch=ObjWatch.validate(obj_watch),
                max_workers=max_workers,
                # extra args
                pending_msgs_limit=pending_msgs_limit,
                pending_bytes_limit=pending_bytes_limit,
                max_msgs=max_msgs,
                durable=durable,
                config=config,
                ordered_consumer=ordered_consumer,
                idle_heartbeat=idle_heartbeat,
                flow_control=flow_control,
                deliver_policy=deliver_policy,
                headers_only=headers_only,
                inbox_prefix=inbox_prefix,
                ack_first=ack_first,
                # subscriber args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_middlewares=self._middlewares,
                broker_dependencies=self._dependencies,
                # AsyncAPI
                title_=title,
                description_=description,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    if stream and subscriber.subject:
        stream.add_subject(subscriber.subject)

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(subject, *, headers=None, reply_to='', stream=None, timeout=None, middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/nats/broker/registrator.py
@override
def publisher(  # type: ignore[override]
    self,
    subject: Annotated[
        str,
        Doc("NATS subject to send message."),
    ],
    *,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("NATS subject name to send response."),
    ] = "",
    # JS
    stream: Annotated[
        Union[str, "JStream", None],
        Doc(
            "This option validates that the target `subject` is in presented stream. "
            "Can be omitted without any effect."
        ),
    ] = None,
    timeout: Annotated[
        Optional[float],
        Doc("Timeout to send message to NATS."),
    ] = None,
    # basic args
    middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> "AsyncAPIPublisher":
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    stream = self._stream_builder.create(stream)

    publisher = cast(
        AsyncAPIPublisher,
        super().publisher(
            publisher=AsyncAPIPublisher.create(
                subject=subject,
                headers=headers,
                # Core
                reply_to=reply_to,
                # JS
                timeout=timeout,
                stream=stream,
                # Specific
                broker_middlewares=self._middlewares,
                middlewares=middlewares,
                # AsyncAPI
                title_=title,
                description_=description,
                schema_=schema,
                include_in_schema=self._solve_include_in_schema(include_in_schema),
            )
        ),
    )

    if stream and publisher.subject:
        stream.add_subject(publisher.subject)

    return publisher

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)
Source code in faststream/nats/broker/registrator.py
@override
def include_router(  # type: ignore[override]
    self,
    router: "NatsRegistrator",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[Msg]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    sub_streams = router._stream_builder.objects.copy()

    sub_router_subjects = [sub.subject for sub in router._subscribers.values()]

    for stream in sub_streams.values():
        new_subjects = []
        for subj in stream.subjects:
            if subj in sub_router_subjects:
                new_subjects.append("".join((self.prefix, subj)))
            else:
                new_subjects.append(subj)
        stream.subjects = new_subjects

    self._stream_builder.objects.update(sub_streams)

    return super().include_router(
        router,
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        include_in_schema=include_in_schema,
    )

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)