Skip to content

Publisher

faststream.redis.asyncapi.Publisher #

Bases: LogicPublisher

A class to represent a Redis publisher.

calls class-attribute instance-attribute #

calls: List[Callable[..., Any]] = field(
    init=False, default_factory=list, repr=False
)

channel class-attribute instance-attribute #

channel: Optional[PubSub] = field(default=None)

channel_name property #

channel_name: str

description property #

description: Optional[str]

headers class-attribute instance-attribute #

headers: Optional[AnyDict] = field(default=None)

include_in_schema class-attribute instance-attribute #

include_in_schema: bool = field(default=True)

list class-attribute instance-attribute #

list: Optional[ListSub] = field(default=None)

mock class-attribute instance-attribute #

mock: Optional[MagicMock] = field(
    init=False, default=None, repr=False
)

name property #

name: str

reply_to class-attribute instance-attribute #

reply_to: str = field(default='')

stream class-attribute instance-attribute #

stream: Optional[StreamSub] = field(default=None)

title class-attribute instance-attribute #

title: Optional[str] = field(default=None)

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self._schema:
        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(
                "",
                __config__=get_config_base(),  # type: ignore[arg-type]
                response__=(self._schema, ...),
            ),
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

publish async #

publish(
    message: SendableMessage,
    channel: Union[str, PubSub, None] = None,
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
    *,
    list: Union[str, ListSub, None] = None,
    stream: Union[str, StreamSub, None] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False
) -> Optional[DecodedMessage]
Source code in faststream/redis/publisher.py
@override
async def publish(  # type: ignore[override]
    self,
    message: SendableMessage,
    channel: Union[str, PubSub, None] = None,
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
    *,
    list: Union[str, ListSub, None] = None,
    stream: Union[str, StreamSub, None] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[DecodedMessage]:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    channel = PubSub.validate(channel or self.channel)
    list = ListSub.validate(list or self.list)
    stream = StreamSub.validate(stream or self.stream)

    assert any((channel, list, stream)), "You have to specify outgoing channel"  # nosec B101

    headers_to_send = (self.headers or {}).copy()
    if headers is not None:
        headers_to_send.update(headers)

    if getattr(list, "batch", False):
        await self._producer.publish_batch(
            *cast(Sequence[SendableMessage], message),
            list=list.name,  # type: ignore[union-attr]
        )
        return None

    else:
        return await self._producer.publish(
            message=message,
            channel=getattr(channel, "name", None),
            list=getattr(list, "name", None),
            stream=getattr(stream, "name", None),
            reply_to=reply_to or self.reply_to,
            correlation_id=correlation_id,
            headers=headers_to_send,
            rpc=rpc,
            rpc_timeout=rpc_timeout,
            raise_timeout=raise_timeout,
        )

reset_test #

reset_test() -> None
Source code in faststream/broker/publisher.py
def reset_test(self) -> None:
    self._fake_handler = False
    self.mock = None

schema #

schema() -> Dict[str, Channel]
Source code in faststream/redis/asyncapi.py
def schema(self) -> Dict[str, Channel]:
    if not self.include_in_schema:
        return {}

    payloads = self.get_payloads()

    method = None
    if self.list is not None:
        method = "rpush"
    elif self.channel is not None:
        method = "publish"
    elif self.stream is not None:
        method = "xadd"
    else:
        raise AssertionError("unreachable")

    return {
        self.name: Channel(
            description=self.description,
            publish=Operation(
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(payloads, "Publisher"),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                redis=redis.ChannelBinding(
                    channel=self.channel_name,
                    method=method,
                )
            ),
        )
    }

set_test #

set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
def set_test(
    self,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    self.mock = mock
    self._fake_handler = with_fake