Skip to content

create_subscriber

faststream.redis.subscriber.factory.create_subscriber #

create_subscriber(*, channel, list, stream, no_ack=False, no_reply=False, retry=False, broker_dependencies=(), broker_middlewares=(), title_=None, description_=None, include_in_schema=True)
Source code in faststream/redis/subscriber/factory.py
def create_subscriber(
    *,
    channel: Union["PubSub", str, None],
    list: Union["ListSub", str, None],
    stream: Union["StreamSub", str, None],
    # Subscriber args
    no_ack: bool = False,
    no_reply: bool = False,
    retry: bool = False,
    broker_dependencies: Iterable["Depends"] = (),
    broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"] = (),
    # AsyncAPI args
    title_: Optional[str] = None,
    description_: Optional[str] = None,
    include_in_schema: bool = True,
) -> SubsciberType:
    validate_options(channel=channel, list=list, stream=stream)

    if (channel_sub := PubSub.validate(channel)) is not None:
        return AsyncAPIChannelSubscriber(
            channel=channel_sub,
            # basic args
            no_ack=no_ack,
            no_reply=no_reply,
            retry=retry,
            broker_dependencies=broker_dependencies,
            broker_middlewares=broker_middlewares,
            # AsyncAPI args
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        )

    elif (stream_sub := StreamSub.validate(stream)) is not None:
        if stream_sub.batch:
            return AsyncAPIStreamBatchSubscriber(
                stream=stream_sub,
                # basic args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=broker_middlewares,
                # AsyncAPI args
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )
        else:
            return AsyncAPIStreamSubscriber(
                stream=stream_sub,
                # basic args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=broker_middlewares,
                # AsyncAPI args
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )

    elif (list_sub := ListSub.validate(list)) is not None:
        if list_sub.batch:
            return AsyncAPIListBatchSubscriber(
                list=list_sub,
                # basic args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=broker_middlewares,
                # AsyncAPI args
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )
        else:
            return AsyncAPIListSubscriber(
                list=list_sub,
                # basic args
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=broker_middlewares,
                # AsyncAPI args
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )

    else:
        raise SetupError(INCORRECT_SETUP_MSG)