def create_subscriber(
*,
channel: Union["PubSub", str, None],
list: Union["ListSub", str, None],
stream: Union["StreamSub", str, None],
# Subscriber args
no_ack: bool = False,
no_reply: bool = False,
retry: bool = False,
broker_dependencies: Iterable["Depends"] = (),
broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"] = (),
# AsyncAPI args
title_: Optional[str] = None,
description_: Optional[str] = None,
include_in_schema: bool = True,
) -> SubsciberType:
validate_options(channel=channel, list=list, stream=stream)
if (channel_sub := PubSub.validate(channel)) is not None:
return AsyncAPIChannelSubscriber(
channel=channel_sub,
# basic args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI args
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
elif (stream_sub := StreamSub.validate(stream)) is not None:
if stream_sub.batch:
return AsyncAPIStreamBatchSubscriber(
stream=stream_sub,
# basic args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI args
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
return AsyncAPIStreamSubscriber(
stream=stream_sub,
# basic args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI args
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
elif (list_sub := ListSub.validate(list)) is not None:
if list_sub.batch:
return AsyncAPIListBatchSubscriber(
list=list_sub,
# basic args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI args
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
return AsyncAPIListSubscriber(
list=list_sub,
# basic args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI args
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
raise SetupError(INCORRECT_SETUP_MSG)