def create_subscriber(
*,
subject: str,
queue: str,
pending_msgs_limit: Optional[int],
pending_bytes_limit: Optional[int],
# Core args
max_msgs: int,
# JS args
durable: Optional[str],
config: Optional["api.ConsumerConfig"],
ordered_consumer: bool,
idle_heartbeat: Optional[float],
flow_control: Optional[bool],
deliver_policy: Optional["api.DeliverPolicy"],
headers_only: Optional[bool],
# pull args
pull_sub: Optional["PullSub"],
kv_watch: Optional["KvWatch"],
obj_watch: Optional["ObjWatch"],
inbox_prefix: bytes,
# custom args
ack_first: bool,
max_workers: int,
stream: Optional["JStream"],
# Subscriber args
no_ack: bool,
no_reply: bool,
retry: Union[bool, int],
broker_dependencies: Iterable["Depends"],
broker_middlewares: Sequence["BrokerMiddleware[Any]"],
# AsyncAPI information
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
) -> Union[
"AsyncAPICoreSubscriber",
"AsyncAPIConcurrentCoreSubscriber",
"AsyncAPIStreamSubscriber",
"AsyncAPIConcurrentPushStreamSubscriber",
"AsyncAPIPullStreamSubscriber",
"AsyncAPIConcurrentPullStreamSubscriber",
"AsyncAPIBatchPullStreamSubscriber",
"AsyncAPIKeyValueWatchSubscriber",
"AsyncAPIObjStoreWatchSubscriber",
]:
_validate_input_for_misconfigure(
subject=subject,
queue=queue,
pending_msgs_limit=pending_msgs_limit,
pending_bytes_limit=pending_bytes_limit,
max_msgs=max_msgs,
durable=durable,
config=config,
ordered_consumer=ordered_consumer,
idle_heartbeat=idle_heartbeat,
flow_control=flow_control,
deliver_policy=deliver_policy,
headers_only=headers_only,
pull_sub=pull_sub,
kv_watch=kv_watch,
obj_watch=obj_watch,
ack_first=ack_first,
max_workers=max_workers,
stream=stream,
)
config = config or ConsumerConfig(filter_subjects=[])
if config.durable_name is None:
config.durable_name = durable
if config.idle_heartbeat is None:
config.idle_heartbeat = idle_heartbeat
if config.headers_only is None:
config.headers_only = headers_only
if config.deliver_policy is DeliverPolicy.ALL:
config.deliver_policy = deliver_policy or DeliverPolicy.ALL
if stream:
# Both JS Subscribers
extra_options: AnyDict = {
"pending_msgs_limit": pending_msgs_limit
or DEFAULT_JS_SUB_PENDING_MSGS_LIMIT,
"pending_bytes_limit": pending_bytes_limit
or DEFAULT_JS_SUB_PENDING_BYTES_LIMIT,
"durable": durable,
"stream": stream.name,
}
if pull_sub is not None:
# JS Pull Subscriber
extra_options.update({"inbox_prefix": inbox_prefix})
else:
# JS Push Subscriber
extra_options.update(
{
"ordered_consumer": ordered_consumer,
"idle_heartbeat": idle_heartbeat,
"flow_control": flow_control,
"deliver_policy": deliver_policy,
"headers_only": headers_only,
"manual_ack": not ack_first,
}
)
else:
# Core Subscriber
extra_options = {
"pending_msgs_limit": pending_msgs_limit or DEFAULT_SUB_PENDING_MSGS_LIMIT,
"pending_bytes_limit": pending_bytes_limit
or DEFAULT_SUB_PENDING_BYTES_LIMIT,
"max_msgs": max_msgs,
}
if obj_watch is not None:
return AsyncAPIObjStoreWatchSubscriber(
subject=subject,
config=config,
obj_watch=obj_watch,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
if kv_watch is not None:
return AsyncAPIKeyValueWatchSubscriber(
subject=subject,
config=config,
kv_watch=kv_watch,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
elif stream is None:
if max_workers > 1:
return AsyncAPIConcurrentCoreSubscriber(
max_workers=max_workers,
subject=subject,
config=config,
queue=queue,
# basic args
extra_options=extra_options,
# Subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI information
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
return AsyncAPICoreSubscriber(
subject=subject,
config=config,
queue=queue,
# basic args
extra_options=extra_options,
# Subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI information
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
if max_workers > 1:
if pull_sub is not None:
return AsyncAPIConcurrentPullStreamSubscriber(
max_workers=max_workers,
pull_sub=pull_sub,
stream=stream,
subject=subject,
config=config,
# basic args
extra_options=extra_options,
# Subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI information
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
return AsyncAPIConcurrentPushStreamSubscriber(
max_workers=max_workers,
stream=stream,
subject=subject,
config=config,
queue=queue,
# basic args
extra_options=extra_options,
# Subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI information
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
if pull_sub is not None:
if pull_sub.batch:
return AsyncAPIBatchPullStreamSubscriber(
pull_sub=pull_sub,
stream=stream,
subject=subject,
config=config,
# basic args
extra_options=extra_options,
# Subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI information
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
return AsyncAPIPullStreamSubscriber(
pull_sub=pull_sub,
stream=stream,
subject=subject,
config=config,
# basic args
extra_options=extra_options,
# Subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI information
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
else:
return AsyncAPIStreamSubscriber(
stream=stream,
subject=subject,
queue=queue,
config=config,
# basic args
extra_options=extra_options,
# Subscriber args
no_ack=no_ack,
no_reply=no_reply,
retry=retry,
broker_dependencies=broker_dependencies,
broker_middlewares=broker_middlewares,
# AsyncAPI information
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)