Skip to content

Visitor

faststream.redis.testing.Visitor #

Bases: Protocol

visit #

visit(*, channel, list, stream, sub)
Source code in faststream/redis/testing.py
def visit(
    self,
    *,
    channel: Optional[str],
    list: Optional[str],
    stream: Optional[str],
    sub: "LogicSubscriber",
) -> Optional[str]: ...

get_message #

get_message(channel, body, sub)
Source code in faststream/redis/testing.py
def get_message(self, channel: str, body: Any, sub: "LogicSubscriber") -> Any: ...