Skip to content

AsyncAPIPublisher

faststream.nats.publisher.asyncapi.AsyncAPIPublisher #

AsyncAPIPublisher(*, subject, reply_to, headers, stream, timeout, broker_middlewares, middlewares, schema_, title_, description_, include_in_schema)

Bases: LogicPublisher

A class to represent a NATS publisher.

Initialize NATS publisher object.

Source code in faststream/nats/publisher/usecase.py
def __init__(
    self,
    *,
    subject: str,
    reply_to: str,
    headers: Optional[Dict[str, str]],
    stream: Optional["JStream"],
    timeout: Optional[float],
    # Publisher args
    broker_middlewares: Iterable["BrokerMiddleware[Msg]"],
    middlewares: Iterable["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> None:
    """Initialize NATS publisher object."""
    super().__init__(
        broker_middlewares=broker_middlewares,
        middlewares=middlewares,
        # AsyncAPI args
        schema_=schema_,
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )

    self.subject = subject
    self.stream = stream
    self.timeout = timeout
    self.headers = headers
    self.reply_to = reply_to

title_ instance-attribute #

title_ = title_

description_ instance-attribute #

description_ = description_

include_in_schema instance-attribute #

include_in_schema = include_in_schema

name property #

name

Returns the name of the API operation.

description property #

description

Returns the description of the API operation.

schema_ instance-attribute #

schema_ = schema_

mock instance-attribute #

mock = None

calls instance-attribute #

calls = []

subject instance-attribute #

subject = subject

stream instance-attribute #

stream = stream

timeout instance-attribute #

timeout = timeout

headers instance-attribute #

headers = headers

reply_to instance-attribute #

reply_to = reply_to

publish async #

publish(message, subject='', *, headers=None, reply_to='', correlation_id=None, stream=None, timeout=None, rpc=False, rpc_timeout=30.0, raise_timeout=False, _extra_middlewares=())

Publish message directly.

PARAMETER DESCRIPTION
message

Message body to send. Can be any encodable object (native python types or pydantic.BaseModel).

TYPE: SendableMessage

subject

NATS subject to send message (default is '').

TYPE: str DEFAULT: ''

headers

obj:dict of :obj:str: :obj:str, optional): Message headers to store metainformation (default is None). content-type and correlation_id will be set automatically by framework anyway.

DEFAULT: None

reply_to

NATS subject name to send response (default is None).

TYPE: str DEFAULT: ''

correlation_id

Manual message correlation_id setter (default is None). correlation_id is a useful option to trace messages.

TYPE: str DEFAULT: None

stream

This option validates that the target subject is in presented stream (default is None). Can be omitted without any effect.

TYPE: str DEFAULT: None

timeout

Timeout to send message to NATS in seconds (default is None).

TYPE: float DEFAULT: None

rpc

Whether to wait for reply in blocking mode (default is False).

TYPE: bool DEFAULT: False

rpc_timeout

RPC reply waiting time (default is 30.0).

TYPE: float DEFAULT: 30.0

raise_timeout

Whetever to raise TimeoutError or return None at rpc_timeout (default is False). RPC request returns None at timeout by default.

TYPE: bool DEFAULT: False

_extra_middlewares

obj:Iterable of :obj:PublisherMiddleware): Extra middlewares to wrap publishing process (default is ()).

DEFAULT: ()

Source code in faststream/nats/publisher/usecase.py
@override
async def publish(
    self,
    message: "SendableMessage",
    subject: str = "",
    *,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    stream: Optional[str] = None,
    timeout: Optional[float] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    # publisher specific
    _extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
    """Publish message directly.

    Args:
        message (SendableMessage): Message body to send.
            Can be any encodable object (native python types or `pydantic.BaseModel`).
        subject (str): NATS subject to send message (default is `''`).
        headers (:obj:`dict` of :obj:`str`: :obj:`str`, optional): Message headers to store metainformation (default is `None`).
            **content-type** and **correlation_id** will be set automatically by framework anyway.

        reply_to (str): NATS subject name to send response (default is `None`).
        correlation_id (str, optional): Manual message **correlation_id** setter (default is `None`).
            **correlation_id** is a useful option to trace messages.

        stream (str, optional): This option validates that the target subject is in presented stream (default is `None`).
            Can be omitted without any effect.
        timeout (float, optional): Timeout to send message to NATS in seconds (default is `None`).
        rpc (bool): Whether to wait for reply in blocking mode (default is `False`).
        rpc_timeout (float, optional): RPC reply waiting time (default is `30.0`).
        raise_timeout (bool): Whetever to raise `TimeoutError` or return `None` at **rpc_timeout** (default is `False`).
            RPC request returns `None` at timeout by default.

        _extra_middlewares (:obj:`Iterable` of :obj:`PublisherMiddleware`): Extra middlewares to wrap publishing process (default is `()`).
    """
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    kwargs: AnyDict = {
        "subject": subject or self.subject,
        "headers": headers or self.headers,
        "reply_to": reply_to or self.reply_to,
        "correlation_id": correlation_id or gen_cor_id(),
        # specific args
        "rpc": rpc,
        "rpc_timeout": rpc_timeout,
        "raise_timeout": raise_timeout,
    }

    if stream := stream or getattr(self.stream, "name", None):
        kwargs.update({"stream": stream, "timeout": timeout or self.timeout})

    call: AsyncFunc = self._producer.publish

    for m in chain(
        (
            _extra_middlewares
            or (m(None).publish_scope for m in self._broker_middlewares)
        ),
        self._middlewares,
    ):
        call = partial(m, call)

    return await call(message, **kwargs)

request async #

request(message, subject='', *, headers=None, correlation_id=None, timeout=0.5, _extra_middlewares=())
Source code in faststream/nats/publisher/usecase.py
@override
async def request(
    self,
    message: Annotated[
        "SendableMessage",
        Doc(
            "Message body to send. "
            "Can be any encodable object (native python types or `pydantic.BaseModel`)."
        ),
    ],
    subject: Annotated[
        str,
        Doc("NATS subject to send message."),
    ] = "",
    *,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway."
        ),
    ] = None,
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    timeout: Annotated[
        float,
        Doc("Timeout to send message to NATS."),
    ] = 0.5,
    # publisher specific
    _extra_middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Extra middlewares to wrap publishing process."),
    ] = (),
) -> "NatsMessage":
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    kwargs: AnyDict = {
        "subject": subject or self.subject,
        "headers": headers or self.headers,
        "timeout": timeout or self.timeout,
        "correlation_id": correlation_id or gen_cor_id(),
    }

    request: AsyncFunc = self._producer.request

    for pub_m in chain(
        (
            _extra_middlewares
            or (m(None).publish_scope for m in self._broker_middlewares)
        ),
        self._middlewares,
    ):
        request = partial(pub_m, request)

    published_msg = await request(
        message,
        **kwargs,
    )

    async with AsyncExitStack() as stack:
        return_msg: Callable[[NatsMessage], Awaitable[NatsMessage]] = return_input
        for m in self._broker_middlewares:
            mid = m(published_msg)
            await stack.enter_async_context(mid)
            return_msg = partial(mid.consume_scope, return_msg)

        parsed_msg = await self._producer._parser(published_msg)
        parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
        return await return_msg(parsed_msg)

    raise AssertionError("unreachable")

setup #

setup(*, producer)
Source code in faststream/broker/publisher/usecase.py
@override
def setup(  # type: ignore[override]
    self,
    *,
    producer: Optional["ProducerProto"],
) -> None:
    self._producer = producer

add_prefix #

add_prefix(prefix)
Source code in faststream/nats/publisher/usecase.py
def add_prefix(self, prefix: str) -> None:
    self.subject = prefix + self.subject

schema #

schema()

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/abc.py
def schema(self) -> Dict[str, Channel]:
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    if self.include_in_schema:
        return self.get_schema()
    else:
        return {}

add_middleware #

add_middleware(middleware)
Source code in faststream/broker/publisher/usecase.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    self._broker_middlewares = (*self._broker_middlewares, middleware)

get_description #

get_description()

Description property fallback.

Source code in faststream/asyncapi/abc.py
def get_description(self) -> Optional[str]:
    """Description property fallback."""
    return None

get_payloads #

get_payloads()
Source code in faststream/broker/publisher/usecase.py
def get_payloads(self) -> List[Tuple["AnyDict", str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self.schema_:
        params = {"response__": (self.schema_, ...)}

        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(  # type: ignore[call-overload]
                "",
                __config__=get_config_base(),
                **params,
            ),
            params=params,
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/broker/publisher/usecase.py
def set_test(
    self,
    *,
    mock: Annotated[
        MagicMock,
        Doc("Mock object to check in tests."),
    ],
    with_fake: Annotated[
        bool,
        Doc("Whetevet publisher's fake subscriber created or not."),
    ],
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/broker/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock = None

get_name #

get_name()
Source code in faststream/nats/publisher/asyncapi.py
def get_name(self) -> str:
    return f"{self.subject}:Publisher"

get_schema #

get_schema()
Source code in faststream/nats/publisher/asyncapi.py
def get_schema(self) -> Dict[str, Channel]:
    payloads = self.get_payloads()

    return {
        self.name: Channel(
            description=self.description,
            publish=Operation(
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(payloads, "Publisher"),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                nats=nats.ChannelBinding(
                    subject=self.subject,
                )
            ),
        )
    }

create classmethod #

create(*, subject, reply_to, headers, stream, timeout, broker_middlewares, middlewares, schema_, title_, description_, include_in_schema)
Source code in faststream/nats/publisher/asyncapi.py
@override
@classmethod
def create(  # type: ignore[override]
    cls,
    *,
    subject: str,
    reply_to: str,
    headers: Optional[Dict[str, str]],
    stream: Optional["JStream"],
    timeout: Optional[float],
    # Publisher args
    broker_middlewares: Iterable["BrokerMiddleware[Msg]"],
    middlewares: Iterable["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> "AsyncAPIPublisher":
    return cls(
        subject=subject,
        reply_to=reply_to,
        headers=headers,
        stream=stream,
        timeout=timeout,
        # Publisher args
        broker_middlewares=broker_middlewares,
        middlewares=middlewares,
        # AsyncAPI args
        schema_=schema_,
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )