Skip to content

ChannelVisitor

faststream.redis.testing.ChannelVisitor #

Bases: Visitor

visit #

visit(*, sub, channel=None, list=None, stream=None)
Source code in faststream/redis/testing.py
def visit(
    self,
    *,
    sub: "LogicSubscriber",
    channel: Optional[str] = None,
    list: Optional[str] = None,
    stream: Optional[str] = None,
) -> Optional[str]:
    if channel is None or not isinstance(sub, ChannelSubscriber):
        return None

    sub_channel = sub.channel

    if (
        sub_channel.pattern
        and bool(
            re.match(
                sub_channel.name.replace(".", "\\.").replace("*", ".*"),
                channel or "",
            )
        )
    ) or channel == sub_channel.name:
        return channel

    return None

get_message #

get_message(channel, body, sub)
Source code in faststream/redis/testing.py
def get_message(  # type: ignore[override]
    self,
    channel: str,
    body: Any,
    sub: "ChannelSubscriber",
) -> Any:
    return PubSubMessage(
        type="message",
        data=body,
        channel=channel,
        pattern=sub.channel.pattern.encode() if sub.channel.pattern else None,
    )