Skip to content

KafkaRegistrator

faststream.confluent.broker.registrator.KafkaRegistrator #

KafkaRegistrator(*, prefix, dependencies, middlewares, parser, decoder, include_in_schema)

Bases: ABCBroker[Union['Message', Tuple['Message', ...]]]

Includable to KafkaBroker router.

Source code in faststream/broker/core/abc.py
def __init__(
    self,
    *,
    prefix: str,
    dependencies: Iterable["Depends"],
    middlewares: Sequence["BrokerMiddleware[MsgType]"],
    parser: Optional["CustomCallable"],
    decoder: Optional["CustomCallable"],
    include_in_schema: Optional[bool],
) -> None:
    self.prefix = prefix
    self.include_in_schema = include_in_schema

    self._subscribers = {}
    self._publishers = {}

    self._dependencies = dependencies
    self._middlewares = middlewares
    self._parser = parser
    self._decoder = decoder

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

subscriber #

subscriber(*topics: str, partitions: Sequence[TopicPartition] = (), polling_interval: float = 0.1, group_id: Optional[str] = None, group_instance_id: Optional[str] = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 50 * 1024 * 1024, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5 * 1000, check_crcs: bool = True, partition_assignment_strategy: Sequence[str] = ('roundrobin'), max_poll_interval_ms: int = 5 * 60 * 1000, session_timeout_ms: int = 10 * 1000, heartbeat_interval_ms: int = 3 * 1000, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', batch: Literal[True], max_records: Optional[int] = None, dependencies: Iterable[Depends] = (), parser: Optional[CustomCallable] = None, decoder: Optional[CustomCallable] = None, middlewares: Sequence[SubscriberMiddleware[KafkaMessage]] = (), filter: Filter[KafkaMessage] = default_filter, retry: bool = False, no_ack: bool = False, no_reply: bool = False, title: Optional[str] = None, description: Optional[str] = None, include_in_schema: bool = True) -> AsyncAPIBatchSubscriber
subscriber(*topics: str, partitions: Sequence[TopicPartition] = (), polling_interval: float = 0.1, group_id: Optional[str] = None, group_instance_id: Optional[str] = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 50 * 1024 * 1024, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5 * 1000, check_crcs: bool = True, partition_assignment_strategy: Sequence[str] = ('roundrobin'), max_poll_interval_ms: int = 5 * 60 * 1000, session_timeout_ms: int = 10 * 1000, heartbeat_interval_ms: int = 3 * 1000, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', batch: Literal[False] = False, max_records: Optional[int] = None, dependencies: Iterable[Depends] = (), parser: Optional[CustomCallable] = None, decoder: Optional[CustomCallable] = None, middlewares: Sequence[SubscriberMiddleware[KafkaMessage]] = (), filter: Filter[KafkaMessage] = default_filter, retry: bool = False, no_ack: bool = False, no_reply: bool = False, title: Optional[str] = None, description: Optional[str] = None, include_in_schema: bool = True) -> AsyncAPIDefaultSubscriber
subscriber(*topics: str, partitions: Sequence[TopicPartition] = (), polling_interval: float = 0.1, group_id: Optional[str] = None, group_instance_id: Optional[str] = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 50 * 1024 * 1024, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5 * 1000, check_crcs: bool = True, partition_assignment_strategy: Sequence[str] = ('roundrobin'), max_poll_interval_ms: int = 5 * 60 * 1000, session_timeout_ms: int = 10 * 1000, heartbeat_interval_ms: int = 3 * 1000, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', batch: bool = False, max_records: Optional[int] = None, dependencies: Iterable[Depends] = (), parser: Optional[CustomCallable] = None, decoder: Optional[CustomCallable] = None, middlewares: Sequence[SubscriberMiddleware[KafkaMessage]] = (), filter: Filter[KafkaMessage] = default_filter, retry: bool = False, no_ack: bool = False, no_reply: bool = False, title: Optional[str] = None, description: Optional[str] = None, include_in_schema: bool = True) -> Union[AsyncAPIDefaultSubscriber, AsyncAPIBatchSubscriber]
subscriber(*topics, partitions=(), polling_interval=0.1, group_id=None, group_instance_id=None, fetch_max_wait_ms=500, fetch_max_bytes=50 * 1024 * 1024, fetch_min_bytes=1, max_partition_fetch_bytes=1 * 1024 * 1024, auto_offset_reset='latest', auto_commit=True, auto_commit_interval_ms=5 * 1000, check_crcs=True, partition_assignment_strategy=('roundrobin'), max_poll_interval_ms=5 * 60 * 1000, session_timeout_ms=10 * 1000, heartbeat_interval_ms=3 * 1000, isolation_level='read_uncommitted', batch=False, max_records=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)
Source code in faststream/confluent/broker/registrator.py
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
@override
def subscriber(
    self,
    *topics: Annotated[
        str,
        Doc("Kafka topics to consume messages from."),
    ],
    partitions: Sequence["TopicPartition"] = (),
    polling_interval: float = 0.1,
    group_id: Annotated[
        Optional[str],
        Doc(
            """
        Name of the consumer group to join for dynamic
        partition assignment (if enabled), and to use for fetching and
        committing offsets. If `None`, auto-partition assignment (via
        group coordinator) and offset commits are disabled.
        """
        ),
    ] = None,
    group_instance_id: Annotated[
        Optional[str],
        Doc(
            """
        A unique string that identifies the consumer instance.
        If set, the consumer is treated as a static member of the group
        and does not participate in consumer group management (e.g.
        partition assignment, rebalances). This can be used to assign
        partitions to specific consumers, rather than letting the group
        assign partitions based on consumer metadata.
        """
        ),
    ] = None,
    fetch_max_wait_ms: Annotated[
        int,
        Doc(
            """
        The maximum amount of time in milliseconds
        the server will block before answering the fetch request if
        there isn't sufficient data to immediately satisfy the
        requirement given by `fetch_min_bytes`.
        """
        ),
    ] = 500,
    fetch_max_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data the server should
        return for a fetch request. This is not an absolute maximum, if
        the first message in the first non-empty partition of the fetch
        is larger than this value, the message will still be returned
        to ensure that the consumer can make progress. NOTE: consumer
        performs fetches to multiple brokers in parallel so memory
        usage will depend on the number of brokers containing
        partitions for the topic.
        """
        ),
    ] = 50 * 1024 * 1024,
    fetch_min_bytes: Annotated[
        int,
        Doc(
            """
        Minimum amount of data the server should
        return for a fetch request, otherwise wait up to
        `fetch_max_wait_ms` for more data to accumulate.
        """
        ),
    ] = 1,
    max_partition_fetch_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data
        per-partition the server will return. The maximum total memory
        used for a request ``= #partitions * max_partition_fetch_bytes``.
        This size must be at least as large as the maximum message size
        the server allows or else it is possible for the producer to
        send messages larger than the consumer can fetch. If that
        happens, the consumer can get stuck trying to fetch a large
        message on a certain partition.
        """
        ),
    ] = 1 * 1024 * 1024,
    auto_offset_reset: Annotated[
        Literal["latest", "earliest", "none"],
        Doc(
            """
        A policy for resetting offsets on `OffsetOutOfRangeError` errors:

        * `earliest` will move to the oldest available message
        * `latest` will move to the most recent
        * `none` will raise an exception so you can handle this case
        """
        ),
    ] = "latest",
    auto_commit: Annotated[
        bool,
        Doc(
            """
        If `True` the consumer's offset will be
        periodically committed in the background.
        """
        ),
    ] = True,
    auto_commit_interval_ms: Annotated[
        int,
        Doc(
            """
        Milliseconds between automatic
        offset commits, if `auto_commit` is `True`."""
        ),
    ] = 5 * 1000,
    check_crcs: Annotated[
        bool,
        Doc(
            """
        Automatically check the CRC32 of the records
        consumed. This ensures no on-the-wire or on-disk corruption to
        the messages occurred. This check adds some overhead, so it may
        be disabled in cases seeking extreme performance.
        """
        ),
    ] = True,
    partition_assignment_strategy: Annotated[
        Sequence[str],
        Doc(
            """
        List of objects to use to
        distribute partition ownership amongst consumer instances when
        group management is used. This preference is implicit in the order
        of the strategies in the list. When assignment strategy changes:
        to support a change to the assignment strategy, new versions must
        enable support both for the old assignment strategy and the new
        one. The coordinator will choose the old assignment strategy until
        all members have been updated. Then it will choose the new
        strategy.
        """
        ),
    ] = ("roundrobin",),
    max_poll_interval_ms: Annotated[
        int,
        Doc(
            """
        Maximum allowed time between calls to
        consume messages in batches. If this interval
        is exceeded the consumer is considered failed and the group will
        rebalance in order to reassign the partitions to another consumer
        group member. If API methods block waiting for messages, that time
        does not count against this timeout.
        """
        ),
    ] = 5 * 60 * 1000,
    session_timeout_ms: Annotated[
        int,
        Doc(
            """
        Client group session and failure detection
        timeout. The consumer sends periodic heartbeats
        (`heartbeat.interval.ms`) to indicate its liveness to the broker.
        If no hearts are received by the broker for a group member within
        the session timeout, the broker will remove the consumer from the
        group and trigger a rebalance. The allowed range is configured with
        the **broker** configuration properties
        `group.min.session.timeout.ms` and `group.max.session.timeout.ms`.
        """
        ),
    ] = 10 * 1000,
    heartbeat_interval_ms: Annotated[
        int,
        Doc(
            """
        The expected time in milliseconds
        between heartbeats to the consumer coordinator when using
        Kafka's group management feature. Heartbeats are used to ensure
        that the consumer's session stays active and to facilitate
        rebalancing when new consumers join or leave the group. The
        value must be set lower than `session_timeout_ms`, but typically
        should be set no higher than 1/3 of that value. It can be
        adjusted even lower to control the expected time for normal
        rebalances.
        """
        ),
    ] = 3 * 1000,
    isolation_level: Annotated[
        Literal["read_uncommitted", "read_committed"],
        Doc(
            """
        Controls how to read messages written
        transactionally.

        * `read_committed`, batch consumer will only return
        transactional messages which have been committed.

        * `read_uncommitted` (the default), batch consumer will
        return all messages, even transactional messages which have been
        aborted.

        Non-transactional messages will be returned unconditionally in
        either mode.

        Messages will always be returned in offset order. Hence, in
        `read_committed` mode, batch consumer will only return
        messages up to the last stable offset (LSO), which is the one less
        than the offset of the first open transaction. In particular any
        messages appearing after messages belonging to ongoing transactions
        will be withheld until the relevant transaction has been completed.
        As a result, `read_committed` consumers will not be able to read up
        to the high watermark when there are in flight transactions.
        Further, when in `read_committed` the seek_to_end method will
        return the LSO. See method docs below.
        """
        ),
    ] = "read_uncommitted",
    batch: Annotated[
        bool,
        Doc("Whether to consume messages in batches or not."),
    ] = False,
    max_records: Annotated[
        Optional[int],
        Doc("Number of messages to consume as one batch."),
    ] = None,
    # broker args
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **Message** object to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[KafkaMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[KafkaMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIDefaultSubscriber",
    "AsyncAPIBatchSubscriber",
]:
    if not auto_commit and not group_id:
        raise SetupError("You should install `group_id` with manual commit mode")

    subscriber = create_subscriber(
        *topics,
        polling_interval=polling_interval,
        partitions=partitions,
        batch=batch,
        max_records=max_records,
        group_id=group_id,
        connection_data={
            "group_instance_id": group_instance_id,
            "fetch_max_wait_ms": fetch_max_wait_ms,
            "fetch_max_bytes": fetch_max_bytes,
            "fetch_min_bytes": fetch_min_bytes,
            "max_partition_fetch_bytes": max_partition_fetch_bytes,
            "auto_offset_reset": auto_offset_reset,
            "enable_auto_commit": auto_commit,
            "auto_commit_interval_ms": auto_commit_interval_ms,
            "check_crcs": check_crcs,
            "partition_assignment_strategy": partition_assignment_strategy,
            "max_poll_interval_ms": max_poll_interval_ms,
            "session_timeout_ms": session_timeout_ms,
            "heartbeat_interval_ms": heartbeat_interval_ms,
            "isolation_level": isolation_level,
        },
        is_manual=not auto_commit,
        # subscriber args
        no_ack=no_ack,
        no_reply=no_reply,
        retry=retry,
        broker_middlewares=self._middlewares,
        broker_dependencies=self._dependencies,
        # AsyncAPI
        title_=title,
        description_=description,
        include_in_schema=self._solve_include_in_schema(include_in_schema),
    )

    if batch:
        subscriber = cast("AsyncAPIBatchSubscriber", subscriber)
    else:
        subscriber = cast("AsyncAPIDefaultSubscriber", subscriber)

    subscriber = super().subscriber(subscriber)  # type: ignore[arg-type,assignment]

    return subscriber.add_call(
        filter_=filter,
        parser_=parser or self._parser,
        decoder_=decoder or self._decoder,
        dependencies_=dependencies,
        middlewares_=middlewares,
    )

publisher #

publisher(topic: str, *, key: Union[bytes, Any, None] = None, partition: Optional[int] = None, headers: Optional[Dict[str, str]] = None, reply_to: str = '', batch: Literal[False] = False, middlewares: Sequence[PublisherMiddleware] = (), title: Optional[str] = None, description: Optional[str] = None, schema: Optional[Any] = None, include_in_schema: bool = True) -> AsyncAPIDefaultPublisher
publisher(topic: str, *, key: Union[bytes, Any, None] = None, partition: Optional[int] = None, headers: Optional[Dict[str, str]] = None, reply_to: str = '', batch: Literal[True], middlewares: Sequence[PublisherMiddleware] = (), title: Optional[str] = None, description: Optional[str] = None, schema: Optional[Any] = None, include_in_schema: bool = True) -> AsyncAPIBatchPublisher
publisher(topic: str, *, key: Union[bytes, Any, None] = None, partition: Optional[int] = None, headers: Optional[Dict[str, str]] = None, reply_to: str = '', batch: bool = False, middlewares: Sequence[PublisherMiddleware] = (), title: Optional[str] = None, description: Optional[str] = None, schema: Optional[Any] = None, include_in_schema: bool = True) -> Union[AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher]
publisher(topic, *, key=None, partition=None, headers=None, reply_to='', batch=False, middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/confluent/broker/registrator.py
@override
def publisher(
    self,
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Topic name to send response."),
    ] = "",
    batch: Annotated[
        bool,
        Doc("Whether to send messages in batches or not."),
    ] = False,
    # basic args
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIBatchPublisher",
    "AsyncAPIDefaultPublisher",
]:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    publisher = AsyncAPIPublisher.create(
        # batch flag
        batch=batch,
        # default args
        key=key,
        # both args
        topic=topic,
        partition=partition,
        headers=headers,
        reply_to=reply_to,
        # publisher-specific
        broker_middlewares=self._middlewares,
        middlewares=middlewares,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=self._solve_include_in_schema(include_in_schema),
    )

    if batch:
        publisher = cast("AsyncAPIBatchPublisher", publisher)
    else:
        publisher = cast("AsyncAPIDefaultPublisher", publisher)

    return super().publisher(publisher)  # type: ignore[return-value,arg-type]