Skip to content

FakeProducer

faststream.redis.testing.FakeProducer #

FakeProducer(broker)

Bases: RedisFastProducer

Source code in faststream/redis/testing.py
def __init__(self, broker: RedisBroker) -> None:
    self.broker = broker

    default = RedisPubSubParser()
    self._parser = resolve_custom_func(
        broker._parser,
        default.parse_message,
    )
    self._decoder = resolve_custom_func(
        broker._decoder,
        default.decode_message,
    )

broker instance-attribute #

broker = broker

publish async #

publish(message, *, channel=None, list=None, stream=None, maxlen=None, headers=None, reply_to='', correlation_id=None, rpc=False, rpc_timeout=30.0, raise_timeout=False)
Source code in faststream/redis/testing.py
@override
async def publish(
    self,
    message: "SendableMessage",
    *,
    channel: Optional[str] = None,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    maxlen: Optional[int] = None,
    headers: Optional["AnyDict"] = None,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[Any]:
    if rpc and reply_to:
        raise WRONG_PUBLISH_ARGS

    correlation_id = correlation_id or gen_cor_id()

    body = build_message(
        message=message,
        reply_to=reply_to,
        correlation_id=correlation_id,
        headers=headers,
    )

    destination = _make_destionation_kwargs(channel, list, stream)
    visitors = (ChannelVisitor(), ListVisitor(), StreamVisitor())

    for handler in self.broker._subscribers.values():  # pragma: no branch
        for visitor in visitors:
            if visited_ch := visitor.visit(**destination, sub=handler):
                msg = visitor.get_message(
                    visited_ch,
                    body,
                    handler,  # type: ignore[arg-type]
                )

                with timeout_scope(rpc_timeout, raise_timeout):
                    response_msg = await self._execute_handler(msg, handler)
                    if rpc:
                        return await self._decoder(await self._parser(response_msg))

    return None

request async #

request(message, *, correlation_id, channel=None, list=None, stream=None, maxlen=None, headers=None, timeout=30.0)
Source code in faststream/redis/testing.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    *,
    correlation_id: str,
    channel: Optional[str] = None,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    maxlen: Optional[int] = None,
    headers: Optional["AnyDict"] = None,
    timeout: Optional[float] = 30.0,
) -> "PubSubMessage":
    correlation_id = correlation_id or gen_cor_id()

    body = build_message(
        message=message,
        correlation_id=correlation_id,
        headers=headers,
    )

    destination = _make_destionation_kwargs(channel, list, stream)
    visitors = (ChannelVisitor(), ListVisitor(), StreamVisitor())

    for handler in self.broker._subscribers.values():  # pragma: no branch
        for visitor in visitors:
            if visited_ch := visitor.visit(**destination, sub=handler):
                msg = visitor.get_message(
                    visited_ch,
                    body,
                    handler,  # type: ignore[arg-type]
                )

                with anyio.fail_after(timeout):
                    return await self._execute_handler(msg, handler)

    raise SubscriberNotFound

publish_batch async #

publish_batch(*msgs, list, headers=None, correlation_id=None)
Source code in faststream/redis/testing.py
async def publish_batch(
    self,
    *msgs: "SendableMessage",
    list: str,
    headers: Optional["AnyDict"] = None,
    correlation_id: Optional[str] = None,
) -> None:
    data_to_send = [
        build_message(
            m,
            correlation_id=correlation_id or gen_cor_id(),
            headers=headers,
        )
        for m in msgs
    ]

    visitor = ListVisitor()
    for handler in self.broker._subscribers.values():  # pragma: no branch
        if visitor.visit(list=list, sub=handler):
            casted_handler = cast(_ListHandlerMixin, handler)

            if casted_handler.list_sub.batch:
                msg = visitor.get_message(list, data_to_send, casted_handler)

                await self._execute_handler(msg, handler)

    return None