Skip to content

LogicPublisher

faststream.nats.publisher.LogicPublisher dataclass #

Bases: BasePublisher[Msg]

A class to represent a NATS publisher.

calls class-attribute instance-attribute #

calls: List[Callable[..., Any]] = field(
    init=False, default_factory=list, repr=False
)

description property #

description: Optional[str]

headers class-attribute instance-attribute #

headers: Optional[Dict[str, str]] = field(default=None)

include_in_schema class-attribute instance-attribute #

include_in_schema: bool = field(default=True)

mock class-attribute instance-attribute #

mock: Optional[MagicMock] = field(
    init=False, default=None, repr=False
)

reply_to class-attribute instance-attribute #

reply_to: str = field(default='')

stream class-attribute instance-attribute #

stream: Optional[JStream] = field(default=None)

subject class-attribute instance-attribute #

subject: str = field(default='')

timeout class-attribute instance-attribute #

timeout: Optional[float] = field(default=None)

title class-attribute instance-attribute #

title: Optional[str] = field(default=None)

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self._schema:
        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(
                "",
                __config__=get_config_base(),  # type: ignore[arg-type]
                response__=(self._schema, ...),
            ),
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

name #

name() -> str

Returns the name of the API operation.

Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    """Returns the name of the API operation."""
    raise NotImplementedError()

publish async #

publish(
    message: SendableMessage = "",
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    **producer_kwargs: Any
) -> Optional[DecodedMessage]
Source code in faststream/nats/publisher.py
@override
async def publish(  # type: ignore[override]
    self,
    message: SendableMessage = "",
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    headers: Optional[Dict[str, str]] = None,
    **producer_kwargs: Any,
) -> Optional[DecodedMessage]:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    assert self.subject, "You have to specify outgoing subject"  # nosec B101

    extra: AnyDict = {
        "reply_to": reply_to or self.reply_to,
    }
    if self.stream is not None:
        extra.update(
            {
                "stream": self.stream.name,
                "timeout": self.timeout,
            }
        )

    return await self._producer.publish(
        message=message,
        subject=self.subject,
        headers=headers or self.headers,
        correlation_id=correlation_id,
        **extra,
        **producer_kwargs,
    )

reset_test #

reset_test() -> None
Source code in faststream/broker/publisher.py
def reset_test(self) -> None:
    self._fake_handler = False
    self.mock = None

schema #

schema() -> Dict[str, Channel]

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    return {}

set_test #

set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
def set_test(
    self,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    self.mock = mock
    self._fake_handler = with_fake