Skip to content

FakeProducer

faststream.redis.test.FakeProducer #

FakeProducer(broker: RedisBroker)

Bases: RedisFastProducer

Source code in faststream/redis/test.py
def __init__(self, broker: RedisBroker) -> None:
    self.broker = broker

broker instance-attribute #

broker = broker

publish async #

publish(message: SendableMessage, channel: Optional[str] = None, reply_to: str = '', headers: Optional[AnyDict] = None, correlation_id: Optional[str] = None, *, list: Optional[str] = None, stream: Optional[str] = None, rpc: bool = False, rpc_timeout: Optional[float] = 30.0, raise_timeout: bool = False, maxlen: Optional[int] = None) -> Optional[Any]
Source code in faststream/redis/test.py
async def publish(
    self,
    message: SendableMessage,
    channel: Optional[str] = None,
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
    *,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
    maxlen: Optional[int] = None,
) -> Optional[Any]:
    any_of = channel or list or stream
    if any_of is None:
        raise ValueError(INCORRECT_SETUP_MSG)

    for handler in self.broker.handlers.values():  # pragma: no branch
        call = False
        batch = False

        if channel and (ch := handler.channel) is not None:
            call = bool(
                (not ch.pattern and ch.name == channel)
                or (
                    ch.pattern
                    and re.match(
                        ch.name.replace(".", "\\.").replace("*", ".*"),
                        channel,
                    )
                )
            )

        if list and (ls := handler.list_sub) is not None:
            batch = ls.batch
            call = list == ls.name

        if stream and (st := handler.stream_sub) is not None:
            batch = st.batch
            call = stream == st.name

        if call:
            r = await call_handler(
                handler=handler,
                message=build_message(
                    message=[message] if batch else message,
                    channel=any_of,
                    headers=headers,
                    correlation_id=correlation_id,
                    reply_to=reply_to,
                ),
                rpc=rpc,
                rpc_timeout=rpc_timeout,
                raise_timeout=raise_timeout,
            )

            if rpc:  # pragma: no branch
                return r

    return None

publish_batch async #

publish_batch(*msgs: SendableMessage, list: str) -> None
Source code in faststream/redis/test.py
async def publish_batch(
    self,
    *msgs: SendableMessage,
    list: str,
) -> None:
    for handler in self.broker.handlers.values():  # pragma: no branch
        if handler.list_sub and handler.list_sub.name == list:
            await call_handler(
                handler=handler,
                message=build_message(
                    message=msgs,
                    channel=list,
                ),
            )

    return None