Skip to content

StreamVisitor

faststream.redis.testing.StreamVisitor #

Bases: Visitor

visit #

visit(*, sub, channel=None, list=None, stream=None)
Source code in faststream/redis/testing.py
def visit(
    self,
    *,
    sub: "LogicSubscriber",
    channel: Optional[str] = None,
    list: Optional[str] = None,
    stream: Optional[str] = None,
) -> Optional[str]:
    if stream is None or not isinstance(sub, _StreamHandlerMixin):
        return None

    if stream == sub.stream_sub.name:
        return stream

    return None

get_message #

get_message(channel, body, sub)
Source code in faststream/redis/testing.py
def get_message(  # type: ignore[override]
    self,
    channel: str,
    body: Any,
    sub: "_StreamHandlerMixin",
) -> Any:
    if sub.stream_sub.batch:
        return BatchStreamMessage(
            type="bstream",
            channel=channel,
            data=[{bDATA_KEY: body}],
            message_ids=[],
        )

    else:
        return DefaultStreamMessage(
            type="stream",
            channel=channel,
            data={bDATA_KEY: body},
            message_ids=[],
        )