Skip to content

NatsRoute

faststream.nats.NatsRoute #

NatsRoute(call, subject, publishers=(), queue='', pending_msgs_limit=None, pending_bytes_limit=None, max_msgs=0, durable=None, config=None, ordered_consumer=False, idle_heartbeat=None, flow_control=False, deliver_policy=None, headers_only=None, pull_sub=None, kv_watch=None, obj_watch=False, inbox_prefix=INBOX_PREFIX, ack_first=False, stream=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, max_workers=1, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)

Bases: SubscriberRoute

Class to store delayed NatsBroker subscriber registration.

Source code in faststream/nats/router.py
def __init__(
    self,
    call: Annotated[
        Union[
            Callable[..., "SendableMessage"],
            Callable[..., Awaitable["SendableMessage"]],
        ],
        Doc(
            "Message handler function "
            "to wrap the same with `@broker.subscriber(...)` way."
        ),
    ],
    subject: Annotated[
        str,
        Doc("NATS subject to subscribe."),
    ],
    publishers: Annotated[
        Iterable[NatsPublisher],
        Doc("Nats publishers to broadcast the handler result."),
    ] = (),
    queue: Annotated[
        str,
        Doc(
            "Subscribers' NATS queue name. Subscribers with same queue name will be load balanced by the NATS "
            "server."
        ),
    ] = "",
    pending_msgs_limit: Annotated[
        Optional[int],
        Doc(
            "Limit of messages, considered by NATS server as possible to be delivered to the client without "
            "been answered. In case of NATS Core, if that limits exceeds, you will receive NATS 'Slow Consumer' "
            "error. "
            "That's literally means that your worker can't handle the whole load. In case of NATS JetStream, "
            "you will no longer receive messages until some of delivered messages will be acked in any way."
        ),
    ] = None,
    pending_bytes_limit: Annotated[
        Optional[int],
        Doc(
            "The number of bytes, considered by NATS server as possible to be delivered to the client without "
            "been answered. In case of NATS Core, if that limit exceeds, you will receive NATS 'Slow Consumer' "
            "error."
            "That's literally means that your worker can't handle the whole load. In case of NATS JetStream, "
            "you will no longer receive messages until some of delivered messages will be acked in any way."
        ),
    ] = None,
    # Core arguments
    max_msgs: Annotated[
        int,
        Doc("Consuming messages limiter. Automatically disconnect if reached."),
    ] = 0,
    # JS arguments
    durable: Annotated[
        Optional[str],
        Doc(
            "Name of the durable consumer to which the the subscription should be bound."
        ),
    ] = None,
    config: Annotated[
        Optional["api.ConsumerConfig"],
        Doc("Configuration of JetStream consumer to be subscribed with."),
    ] = None,
    ordered_consumer: Annotated[
        bool,
        Doc("Enable ordered consumer mode."),
    ] = False,
    idle_heartbeat: Annotated[
        Optional[float],
        Doc("Enable Heartbeats for a consumer to detect failures."),
    ] = None,
    flow_control: Annotated[
        bool,
        Doc("Enable Flow Control for a consumer."),
    ] = False,
    deliver_policy: Annotated[
        Optional["api.DeliverPolicy"],
        Doc("Deliver Policy to be used for subscription."),
    ] = None,
    headers_only: Annotated[
        Optional[bool],
        Doc(
            "Should be message delivered without payload, only headers and metadata."
        ),
    ] = None,
    # pull arguments
    pull_sub: Annotated[
        Optional["PullSub"],
        Doc(
            "NATS Pull consumer parameters container. "
            "Should be used with `stream` only."
        ),
    ] = None,
    kv_watch: Annotated[
        Union[str, "KvWatch", None],
        Doc("KeyValue watch parameters container."),
    ] = None,
    obj_watch: Annotated[
        Union[bool, "ObjWatch"],
        Doc("ObjecStore watch parameters container."),
    ] = False,
    inbox_prefix: Annotated[
        bytes,
        Doc(
            "Prefix for generating unique inboxes, subjects with that prefix and NUID."
        ),
    ] = api.INBOX_PREFIX,
    # custom
    ack_first: Annotated[
        bool,
        Doc("Whether to `ack` message at start of consuming or not."),
    ] = False,
    stream: Annotated[
        Union[str, "JStream", None],
        Doc("Subscribe to NATS Stream with `subject` filter."),
    ] = None,
    # broker arguments
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **nats-py** Msg to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Iterable["SubscriberMiddleware[NatsMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        Union[
            "Filter[NatsMessage]",
            "Filter[NatsBatchMessage]",
        ],
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    max_workers: Annotated[
        int,
        Doc("Number of workers to process messages concurrently."),
    ] = 1,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> None:
    super().__init__(
        call,
        subject=subject,
        publishers=publishers,
        pending_msgs_limit=pending_msgs_limit,
        pending_bytes_limit=pending_bytes_limit,
        max_msgs=max_msgs,
        durable=durable,
        config=config,
        ordered_consumer=ordered_consumer,
        idle_heartbeat=idle_heartbeat,
        flow_control=flow_control,
        deliver_policy=deliver_policy,
        headers_only=headers_only,
        pull_sub=pull_sub,
        kv_watch=kv_watch,
        obj_watch=obj_watch,
        inbox_prefix=inbox_prefix,
        ack_first=ack_first,
        stream=stream,
        max_workers=max_workers,
        queue=queue,
        dependencies=dependencies,
        parser=parser,
        decoder=decoder,
        middlewares=middlewares,
        filter=filter,
        retry=retry,
        no_ack=no_ack,
        no_reply=no_reply,
        title=title,
        description=description,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs

call instance-attribute #

call = call

publishers instance-attribute #

publishers = publishers