Skip to content

NatsPublisher

faststream.nats.router.NatsPublisher #

NatsPublisher(subject, *, headers=None, reply_to='', stream=None, timeout=None, middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Bases: ArgsContainer

Delayed NatsPublisher registration object.

Just a copy of KafkaRegistrator.publisher(...) arguments.

Source code in faststream/nats/router.py
def __init__(
    self,
    subject: Annotated[
        str,
        Doc("NATS subject to send message."),
    ],
    *,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("NATS subject name to send response."),
    ] = "",
    # JS
    stream: Annotated[
        Union[str, "JStream", None],
        Doc(
            "This option validates that the target `subject` is in presented stream. "
            "Can be omitted without any effect."
        ),
    ] = None,
    timeout: Annotated[
        Optional[float],
        Doc("Timeout to send message to NATS."),
    ] = None,
    # basic args
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI information
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> None:
    super().__init__(
        subject=subject,
        headers=headers,
        reply_to=reply_to,
        stream=stream,
        timeout=timeout,
        middlewares=middlewares,
        title=title,
        description=description,
        schema=schema,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs