Skip to content

ListVisitor

faststream.redis.testing.ListVisitor #

Bases: Visitor

visit #

visit(*, sub, channel=None, list=None, stream=None)
Source code in faststream/redis/testing.py
def visit(
    self,
    *,
    sub: "LogicSubscriber",
    channel: Optional[str] = None,
    list: Optional[str] = None,
    stream: Optional[str] = None,
) -> Optional[str]:
    if list is None or not isinstance(sub, _ListHandlerMixin):
        return None

    if list == sub.list_sub.name:
        return list

    return None

get_message #

get_message(channel, body, sub)
Source code in faststream/redis/testing.py
def get_message(  # type: ignore[override]
    self,
    channel: str,
    body: Any,
    sub: "_ListHandlerMixin",
) -> Any:
    if sub.list_sub.batch:
        return BatchListMessage(
            type="blist",
            channel=channel,
            data=body if isinstance(body, List) else [body],
        )

    else:
        return DefaultListMessage(
            type="list",
            channel=channel,
            data=body,
        )