Skip to content

NatsRouter

faststream.nats.NatsRouter #

NatsRouter(
    prefix: str = "",
    handlers: Sequence[NatsRoute] = (),
    *,
    dependencies: Sequence[Depends] = (),
    middlewares: Optional[
        Sequence[Callable[[Msg], BaseMiddleware]]
    ] = None,
    parser: Optional[CustomParser[Msg, NatsMessage]] = None,
    decoder: Optional[CustomDecoder[NatsMessage]] = None
)

Bases: NatsRouter

Source code in faststream/nats/router.py
    subject: str,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
) -> Publisher:
    new_publisher = self._update_publisher_prefix(
        self.prefix,

prefix instance-attribute #

prefix: str = prefix

include_router #

include_router(
    router: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[PublisherKeyType, MsgType]

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/router.py
def include_router(self, router: "BrokerRouter[PublisherKeyType, MsgType]") -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    for h in router._handlers:
        self.subscriber(*h.args, **h.kwargs)(h.call)

    for p in router._publishers.values():
        p = self._update_publisher_prefix(self.prefix, p)
        key = self._get_publisher_key(p)
        self._publishers[key] = self._publishers.get(key, p)

include_routers #

include_routers(
    *routers: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes routers in the object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[PublisherKeyType, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/router.py
def include_routers(
    self, *routers: "BrokerRouter[PublisherKeyType, MsgType]"
) -> None:
    """Includes routers in the object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    for r in routers:
        self.include_router(r)

publisher #

publisher(
    subject: str,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
) -> Publisher
Source code in faststream/nats/router.py
@override
def publisher(  # type: ignore[override]
    self,
    subject: str,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
) -> Publisher:
    new_publisher = self._update_publisher_prefix(
        self.prefix,
        Publisher(
            subject=subject,
            reply_to=reply_to,
            headers=headers,
            title=title,
            _description=description,
            _schema=schema,
        ),
    )
    publisher_key = self._get_publisher_key(new_publisher)
    publisher = self._publishers[publisher_key] = self._publishers.get(
        publisher_key, new_publisher
    )
    return publisher

subscriber #

subscriber(
    subject: str,
    queue: str = "",
    pending_msgs_limit: Optional[int] = None,
    pending_bytes_limit: Optional[int] = None,
    max_msgs: int = 0,
    ack_first: bool = False,
    stream: Union[str, JStream, None] = None,
    durable: Optional[str] = None,
    config: Optional[api.ConsumerConfig] = None,
    ordered_consumer: bool = False,
    idle_heartbeat: Optional[float] = None,
    flow_control: bool = False,
    deliver_policy: Optional[api.DeliverPolicy] = None,
    headers_only: Optional[bool] = None,
    pull_sub: Optional[PullSub] = None,
    inbox_prefix: bytes = api.INBOX_PREFIX,
    dependencies: Sequence[Depends] = (),
    parser: Optional[CustomParser[Msg, NatsMessage]] = None,
    decoder: Optional[CustomDecoder[NatsMessage]] = None,
    middlewares: Optional[
        Sequence[Callable[[Msg], BaseMiddleware]]
    ] = None,
    filter: Filter[NatsMessage] = default_filter,
    retry: bool = False,
    title: Optional[str] = None,
    description: Optional[str] = None,
    **__service_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        Msg, P_HandlerParams, T_HandlerReturn
    ],
]

Last update: 2023-11-13