Skip to content

KafkaBroker

faststream.confluent.KafkaBroker #

KafkaBroker(bootstrap_servers='localhost', *, request_timeout_ms=40 * 1000, retry_backoff_ms=100, metadata_max_age_ms=5 * 60 * 1000, connections_max_idle_ms=9 * 60 * 1000, client_id=SERVICE_NAME, allow_auto_create_topics=True, config=None, acks=EMPTY, compression_type=None, partitioner='consistent_random', max_request_size=1024 * 1024, linger_ms=0, enable_idempotence=False, transactional_id=None, transaction_timeout_ms=60 * 1000, graceful_timeout=15.0, decoder=None, parser=None, dependencies=(), middlewares=(), security=None, asyncapi_url=None, protocol=None, protocol_version='auto', description=None, tags=None, logger=EMPTY, log_level=logging.INFO, log_fmt=None, apply_types=True, validate=True, _get_dependant=None, _call_decorators=())

Bases: KafkaRegistrator, KafkaLoggingBroker

Source code in faststream/confluent/broker/broker.py
def __init__(
    self,
    bootstrap_servers: Annotated[
        Union[str, Iterable[str]],
        Doc(
            """
        A `host[:port]` string (or list of `host[:port]` strings) that the consumer should contact to bootstrap
        initial cluster metadata.

        This does not have to be the full node list.
        It just needs to have at least one broker that will respond to a
        Metadata API Request. Default port is 9092.
        """
        ),
    ] = "localhost",
    *,
    # both
    request_timeout_ms: Annotated[
        int,
        Doc("Client request timeout in milliseconds."),
    ] = 40 * 1000,
    retry_backoff_ms: Annotated[
        int,
        Doc("Milliseconds to backoff when retrying on errors."),
    ] = 100,
    metadata_max_age_ms: Annotated[
        int,
        Doc(
            """
        The period of time in milliseconds after
        which we force a refresh of metadata even if we haven't seen any
        partition leadership changes to proactively discover any new
        brokers or partitions.
        """
        ),
    ] = 5 * 60 * 1000,
    connections_max_idle_ms: Annotated[
        int,
        Doc(
            """
         Close idle connections after the number
        of milliseconds specified by this config. Specifying `None` will
        disable idle checks.
        """
        ),
    ] = 9 * 60 * 1000,
    client_id: Annotated[
        Optional[str],
        Doc(
            """
        A name for this client. This string is passed in
        each request to servers and can be used to identify specific
        server-side log entries that correspond to this client. Also
        submitted to :class:`~.consumer.group_coordinator.GroupCoordinator`
        for logging with respect to consumer group administration.
        """
        ),
    ] = SERVICE_NAME,
    allow_auto_create_topics: Annotated[
        bool,
        Doc(
            """
        Allow automatic topic creation on the broker when subscribing to or assigning non-existent topics.
        """
        ),
    ] = True,
    config: Annotated[
        Optional["ConfluentConfig"],
        Doc(
            """
            Extra configuration for the confluent-kafka-python
            producer/consumer. See `confluent_kafka.Config <https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#kafka-client-configuration>`_.
            """
        ),
    ] = None,
    # publisher args
    acks: Annotated[
        Literal[0, 1, -1, "all"],
        Doc(
            """
        One of ``0``, ``1``, ``all``. The number of acknowledgments
        the producer requires the leader to have received before considering a
        request complete. This controls the durability of records that are
        sent. The following settings are common:

        * ``0``: Producer will not wait for any acknowledgment from the server
          at all. The message will immediately be added to the socket
          buffer and considered sent. No guarantee can be made that the
          server has received the record in this case, and the retries
          configuration will not take effect (as the client won't
          generally know of any failures). The offset given back for each
          record will always be set to -1.
        * ``1``: The broker leader will write the record to its local log but
          will respond without awaiting full acknowledgement from all
          followers. In this case should the leader fail immediately
          after acknowledging the record but before the followers have
          replicated it then the record will be lost.
        * ``all``: The broker leader will wait for the full set of in-sync
          replicas to acknowledge the record. This guarantees that the
          record will not be lost as long as at least one in-sync replica
          remains alive. This is the strongest available guarantee.

        If unset, defaults to ``acks=1``. If `enable_idempotence` is
        :data:`True` defaults to ``acks=all``.
        """
        ),
    ] = EMPTY,
    compression_type: Annotated[
        Optional[Literal["gzip", "snappy", "lz4", "zstd"]],
        Doc(
            """
        The compression type for all data generated bythe producer.
        Compression is of full batches of data, so the efficacy of batching
        will also impact the compression ratio (more batching means better
        compression).
        """
        ),
    ] = None,
    partitioner: Annotated[
        Union[
            str,
            Callable[
                [bytes, List[Partition], List[Partition]],
                Partition,
            ],
        ],
        Doc(
            """
        Callable used to determine which partition
        each message is assigned to. Called (after key serialization):
        ``partitioner(key_bytes, all_partitions, available_partitions)``.
        The default partitioner implementation hashes each non-None key
        using the same murmur2 algorithm as the Java client so that
        messages with the same key are assigned to the same partition.
        When a key is :data:`None`, the message is delivered to a random partition
        (filtered to partitions with available leaders only, if possible).
        """
        ),
    ] = "consistent_random",
    max_request_size: Annotated[
        int,
        Doc(
            """
        The maximum size of a request. This is also
        effectively a cap on the maximum record size. Note that the server
        has its own cap on record size which may be different from this.
        This setting will limit the number of record batches the producer
        will send in a single request to avoid sending huge requests.
        """
        ),
    ] = 1024 * 1024,
    linger_ms: Annotated[
        int,
        Doc(
            """
        The producer groups together any records that arrive
        in between request transmissions into a single batched request.
        Normally this occurs only under load when records arrive faster
        than they can be sent out. However in some circumstances the client
        may want to reduce the number of requests even under moderate load.
        This setting accomplishes this by adding a small amount of
        artificial delay; that is, if first request is processed faster,
        than `linger_ms`, producer will wait ``linger_ms - process_time``.
        """
        ),
    ] = 0,
    enable_idempotence: Annotated[
        bool,
        Doc(
            """
        When set to `True`, the producer will
        ensure that exactly one copy of each message is written in the
        stream. If `False`, producer retries due to broker failures,
        etc., may write duplicates of the retried message in the stream.
        Note that enabling idempotence acks to set to ``all``. If it is not
        explicitly set by the user it will be chosen.
        """
        ),
    ] = False,
    transactional_id: Optional[str] = None,
    transaction_timeout_ms: int = 60 * 1000,
    # broker base args
    graceful_timeout: Annotated[
        Optional[float],
        Doc(
            "Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down."
        ),
    ] = 15.0,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Custom decoder object."),
    ] = None,
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Custom parser object."),
    ] = None,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies to apply to all broker subscribers."),
    ] = (),
    middlewares: Annotated[
        Iterable[
            Union[
                "BrokerMiddleware[Message]",
                "BrokerMiddleware[Tuple[Message, ...]]",
            ]
        ],
        Doc("Middlewares to apply to all broker publishers/subscribers."),
    ] = (),
    # AsyncAPI args
    security: Annotated[
        Optional["BaseSecurity"],
        Doc(
            "Security options to connect broker and generate AsyncAPI server security information."
        ),
    ] = None,
    asyncapi_url: Annotated[
        Union[str, Iterable[str], None],
        Doc("AsyncAPI hardcoded server addresses. Use `servers` if not specified."),
    ] = None,
    protocol: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol."),
    ] = None,
    protocol_version: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol version."),
    ] = "auto",
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI server description."),
    ] = None,
    tags: Annotated[
        Optional[Iterable[Union["asyncapi.Tag", "asyncapi.TagDict"]]],
        Doc("AsyncAPI server tags."),
    ] = None,
    # logging args
    logger: Annotated[
        Optional["LoggerProto"],
        Doc("User specified logger to pass into Context and log service messages."),
    ] = EMPTY,
    log_level: Annotated[
        int,
        Doc("Service messages log level."),
    ] = logging.INFO,
    log_fmt: Annotated[
        Optional[str],
        Doc("Default logger log format."),
    ] = None,
    # FastDepends args
    apply_types: Annotated[
        bool,
        Doc("Whether to use FastDepends or not."),
    ] = True,
    validate: Annotated[
        bool,
        Doc("Whether to cast types using Pydantic validation."),
    ] = True,
    _get_dependant: Annotated[
        Optional[Callable[..., Any]],
        Doc("Custom library dependant generator callback."),
    ] = None,
    _call_decorators: Annotated[
        Iterable["Decorator"],
        Doc("Any custom decorator to apply to wrapped functions."),
    ] = (),
) -> None:
    if protocol is None:
        if security is not None and security.use_ssl:
            protocol = "kafka-secure"
        else:
            protocol = "kafka"

    servers = (
        [bootstrap_servers]
        if isinstance(bootstrap_servers, str)
        else list(bootstrap_servers)
    )

    if asyncapi_url is not None:
        if isinstance(asyncapi_url, str):
            asyncapi_url = [asyncapi_url]
        else:
            asyncapi_url = list(asyncapi_url)
    else:
        asyncapi_url = servers

    super().__init__(
        bootstrap_servers=servers,
        # both args
        client_id=client_id,
        request_timeout_ms=request_timeout_ms,
        retry_backoff_ms=retry_backoff_ms,
        metadata_max_age_ms=metadata_max_age_ms,
        allow_auto_create_topics=allow_auto_create_topics,
        connections_max_idle_ms=connections_max_idle_ms,
        # publisher args
        acks=acks,
        compression_type=compression_type,
        partitioner=partitioner,
        max_request_size=max_request_size,
        linger_ms=linger_ms,
        enable_idempotence=enable_idempotence,
        transactional_id=transactional_id,
        transaction_timeout_ms=transaction_timeout_ms,
        # Basic args
        graceful_timeout=graceful_timeout,
        dependencies=dependencies,
        decoder=decoder,
        parser=parser,
        middlewares=middlewares,
        # AsyncAPI args
        description=description,
        asyncapi_url=asyncapi_url,
        protocol=protocol,
        protocol_version=protocol_version,
        security=security,
        tags=tags,
        # Logging args
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
        # FastDepends args
        _get_dependant=_get_dependant,
        _call_decorators=_call_decorators,
        apply_types=apply_types,
        validate=validate,
    )
    self.client_id = client_id
    self._producer = None
    self.config = ConfluentFastConfig(config)

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

url instance-attribute #

url

client_id instance-attribute #

client_id = client_id

config instance-attribute #

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(*topics, partitions=(), polling_interval=0.1, group_id=None, group_instance_id=None, fetch_max_wait_ms=500, fetch_max_bytes=50 * 1024 * 1024, fetch_min_bytes=1, max_partition_fetch_bytes=1 * 1024 * 1024, auto_offset_reset='latest', auto_commit=True, auto_commit_interval_ms=5 * 1000, check_crcs=True, partition_assignment_strategy=('roundrobin'), max_poll_interval_ms=5 * 60 * 1000, session_timeout_ms=10 * 1000, heartbeat_interval_ms=3 * 1000, isolation_level='read_uncommitted', batch=False, max_records=None, dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)
Source code in faststream/confluent/broker/registrator.py
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
@override
def subscriber(
    self,
    *topics: Annotated[
        str,
        Doc("Kafka topics to consume messages from."),
    ],
    partitions: Sequence["TopicPartition"] = (),
    polling_interval: float = 0.1,
    group_id: Annotated[
        Optional[str],
        Doc(
            """
        Name of the consumer group to join for dynamic
        partition assignment (if enabled), and to use for fetching and
        committing offsets. If `None`, auto-partition assignment (via
        group coordinator) and offset commits are disabled.
        """
        ),
    ] = None,
    group_instance_id: Annotated[
        Optional[str],
        Doc(
            """
        A unique string that identifies the consumer instance.
        If set, the consumer is treated as a static member of the group
        and does not participate in consumer group management (e.g.
        partition assignment, rebalances). This can be used to assign
        partitions to specific consumers, rather than letting the group
        assign partitions based on consumer metadata.
        """
        ),
    ] = None,
    fetch_max_wait_ms: Annotated[
        int,
        Doc(
            """
        The maximum amount of time in milliseconds
        the server will block before answering the fetch request if
        there isn't sufficient data to immediately satisfy the
        requirement given by `fetch_min_bytes`.
        """
        ),
    ] = 500,
    fetch_max_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data the server should
        return for a fetch request. This is not an absolute maximum, if
        the first message in the first non-empty partition of the fetch
        is larger than this value, the message will still be returned
        to ensure that the consumer can make progress. NOTE: consumer
        performs fetches to multiple brokers in parallel so memory
        usage will depend on the number of brokers containing
        partitions for the topic.
        """
        ),
    ] = 50 * 1024 * 1024,
    fetch_min_bytes: Annotated[
        int,
        Doc(
            """
        Minimum amount of data the server should
        return for a fetch request, otherwise wait up to
        `fetch_max_wait_ms` for more data to accumulate.
        """
        ),
    ] = 1,
    max_partition_fetch_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data
        per-partition the server will return. The maximum total memory
        used for a request ``= #partitions * max_partition_fetch_bytes``.
        This size must be at least as large as the maximum message size
        the server allows or else it is possible for the producer to
        send messages larger than the consumer can fetch. If that
        happens, the consumer can get stuck trying to fetch a large
        message on a certain partition.
        """
        ),
    ] = 1 * 1024 * 1024,
    auto_offset_reset: Annotated[
        Literal["latest", "earliest", "none"],
        Doc(
            """
        A policy for resetting offsets on `OffsetOutOfRangeError` errors:

        * `earliest` will move to the oldest available message
        * `latest` will move to the most recent
        * `none` will raise an exception so you can handle this case
        """
        ),
    ] = "latest",
    auto_commit: Annotated[
        bool,
        Doc(
            """
        If `True` the consumer's offset will be
        periodically committed in the background.
        """
        ),
    ] = True,
    auto_commit_interval_ms: Annotated[
        int,
        Doc(
            """
        Milliseconds between automatic
        offset commits, if `auto_commit` is `True`."""
        ),
    ] = 5 * 1000,
    check_crcs: Annotated[
        bool,
        Doc(
            """
        Automatically check the CRC32 of the records
        consumed. This ensures no on-the-wire or on-disk corruption to
        the messages occurred. This check adds some overhead, so it may
        be disabled in cases seeking extreme performance.
        """
        ),
    ] = True,
    partition_assignment_strategy: Annotated[
        Sequence[str],
        Doc(
            """
        List of objects to use to
        distribute partition ownership amongst consumer instances when
        group management is used. This preference is implicit in the order
        of the strategies in the list. When assignment strategy changes:
        to support a change to the assignment strategy, new versions must
        enable support both for the old assignment strategy and the new
        one. The coordinator will choose the old assignment strategy until
        all members have been updated. Then it will choose the new
        strategy.
        """
        ),
    ] = ("roundrobin",),
    max_poll_interval_ms: Annotated[
        int,
        Doc(
            """
        Maximum allowed time between calls to
        consume messages in batches. If this interval
        is exceeded the consumer is considered failed and the group will
        rebalance in order to reassign the partitions to another consumer
        group member. If API methods block waiting for messages, that time
        does not count against this timeout.
        """
        ),
    ] = 5 * 60 * 1000,
    session_timeout_ms: Annotated[
        int,
        Doc(
            """
        Client group session and failure detection
        timeout. The consumer sends periodic heartbeats
        (`heartbeat.interval.ms`) to indicate its liveness to the broker.
        If no hearts are received by the broker for a group member within
        the session timeout, the broker will remove the consumer from the
        group and trigger a rebalance. The allowed range is configured with
        the **broker** configuration properties
        `group.min.session.timeout.ms` and `group.max.session.timeout.ms`.
        """
        ),
    ] = 10 * 1000,
    heartbeat_interval_ms: Annotated[
        int,
        Doc(
            """
        The expected time in milliseconds
        between heartbeats to the consumer coordinator when using
        Kafka's group management feature. Heartbeats are used to ensure
        that the consumer's session stays active and to facilitate
        rebalancing when new consumers join or leave the group. The
        value must be set lower than `session_timeout_ms`, but typically
        should be set no higher than 1/3 of that value. It can be
        adjusted even lower to control the expected time for normal
        rebalances.
        """
        ),
    ] = 3 * 1000,
    isolation_level: Annotated[
        Literal["read_uncommitted", "read_committed"],
        Doc(
            """
        Controls how to read messages written
        transactionally.

        * `read_committed`, batch consumer will only return
        transactional messages which have been committed.

        * `read_uncommitted` (the default), batch consumer will
        return all messages, even transactional messages which have been
        aborted.

        Non-transactional messages will be returned unconditionally in
        either mode.

        Messages will always be returned in offset order. Hence, in
        `read_committed` mode, batch consumer will only return
        messages up to the last stable offset (LSO), which is the one less
        than the offset of the first open transaction. In particular any
        messages appearing after messages belonging to ongoing transactions
        will be withheld until the relevant transaction has been completed.
        As a result, `read_committed` consumers will not be able to read up
        to the high watermark when there are in flight transactions.
        Further, when in `read_committed` the seek_to_end method will
        return the LSO. See method docs below.
        """
        ),
    ] = "read_uncommitted",
    batch: Annotated[
        bool,
        Doc("Whether to consume messages in batches or not."),
    ] = False,
    max_records: Annotated[
        Optional[int],
        Doc("Number of messages to consume as one batch."),
    ] = None,
    # broker args
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **Message** object to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Iterable["SubscriberMiddleware[KafkaMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[KafkaMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIDefaultSubscriber",
    "AsyncAPIBatchSubscriber",
]:
    if not auto_commit and not group_id:
        raise SetupError("You should install `group_id` with manual commit mode")

    subscriber = super().subscriber(
        create_subscriber(
            *topics,
            polling_interval=polling_interval,
            partitions=partitions,
            batch=batch,
            max_records=max_records,
            group_id=group_id,
            connection_data={
                "group_instance_id": group_instance_id,
                "fetch_max_wait_ms": fetch_max_wait_ms,
                "fetch_max_bytes": fetch_max_bytes,
                "fetch_min_bytes": fetch_min_bytes,
                "max_partition_fetch_bytes": max_partition_fetch_bytes,
                "auto_offset_reset": auto_offset_reset,
                "enable_auto_commit": auto_commit,
                "auto_commit_interval_ms": auto_commit_interval_ms,
                "check_crcs": check_crcs,
                "partition_assignment_strategy": partition_assignment_strategy,
                "max_poll_interval_ms": max_poll_interval_ms,
                "session_timeout_ms": session_timeout_ms,
                "heartbeat_interval_ms": heartbeat_interval_ms,
                "isolation_level": isolation_level,
            },
            is_manual=not auto_commit,
            # subscriber args
            no_ack=no_ack,
            no_reply=no_reply,
            retry=retry,
            broker_middlewares=self._middlewares,
            broker_dependencies=self._dependencies,
            # AsyncAPI
            title_=title,
            description_=description,
            include_in_schema=self._solve_include_in_schema(include_in_schema),
        )
    )

    if batch:
        return cast("AsyncAPIBatchSubscriber", subscriber).add_call(
            filter_=filter,
            parser_=parser or self._parser,
            decoder_=decoder or self._decoder,
            dependencies_=dependencies,
            middlewares_=middlewares,
        )
    else:
        return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
            filter_=filter,
            parser_=parser or self._parser,
            decoder_=decoder or self._decoder,
            dependencies_=dependencies,
            middlewares_=middlewares,
        )

publisher #

publisher(topic, *, key=None, partition=None, headers=None, reply_to='', batch=False, middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/confluent/broker/registrator.py
@override
def publisher(
    self,
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Topic name to send response."),
    ] = "",
    batch: Annotated[
        bool,
        Doc("Whether to send messages in batches or not."),
    ] = False,
    # basic args
    middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIBatchPublisher",
    "AsyncAPIDefaultPublisher",
]:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    publisher = AsyncAPIPublisher.create(
        # batch flag
        batch=batch,
        # default args
        key=key,
        # both args
        topic=topic,
        partition=partition,
        headers=headers,
        reply_to=reply_to,
        # publisher-specific
        broker_middlewares=self._middlewares,
        middlewares=middlewares,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=self._solve_include_in_schema(include_in_schema),
    )

    if batch:
        return cast("AsyncAPIBatchPublisher", super().publisher(publisher))
    else:
        return cast("AsyncAPIDefaultPublisher", super().publisher(publisher))

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt #

get_fmt()
Source code in faststream/confluent/broker/logging.py
def get_fmt(self) -> str:
    return (
        "%(asctime)s %(levelname)-8s - "
        + f"%(topic)-{self._max_topic_len}s | "
        + (f"%(group_id)-{self._max_group_len}s | " if self._max_group_len else "")
        + f"%(message_id)-{self.__max_msg_id_ln}s "
        + "- %(message)s"
    )

setup_subscriber #

setup_subscriber(subscriber, **kwargs)

Setup the Subscriber to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)

setup_publisher #

setup_publisher(publisher, **kwargs)

Setup the Publisher to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

connect async #

connect(bootstrap_servers=EMPTY, **kwargs)
Source code in faststream/confluent/broker/broker.py
async def connect(
    self,
    bootstrap_servers: Annotated[
        Union[str, Iterable[str]],
        Doc("Kafka addresses to connect."),
    ] = EMPTY,
    **kwargs: Any,
) -> Callable[..., AsyncConfluentConsumer]:
    if bootstrap_servers is not EMPTY:
        kwargs["bootstrap_servers"] = bootstrap_servers

    return await super().connect(**kwargs)

start async #

start()
Source code in faststream/confluent/broker/broker.py
async def start(self) -> None:
    await super().start()

    for handler in self._subscribers.values():
        self._log(
            f"`{handler.call_name}` waiting for messages",
            extra=handler.get_log_context(None),
        )
        await handler.start()

publish async #

publish(message, topic, key=None, partition=None, timestamp_ms=None, headers=None, correlation_id=None, *, reply_to='', no_confirm=False, **kwargs)
Source code in faststream/confluent/broker/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    message: "SendableMessage",
    topic: str,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    *,
    reply_to: str = "",
    no_confirm: bool = False,
    # extra options to be compatible with test client
    **kwargs: Any,
) -> Optional[Any]:
    correlation_id = correlation_id or gen_cor_id()

    return await super().publish(
        message,
        producer=self._producer,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
        no_confirm=no_confirm,
        **kwargs,
    )

request async #

request(message, topic, key=None, partition=None, timestamp_ms=None, headers=None, correlation_id=None, timeout=0.5)
Source code in faststream/confluent/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: "SendableMessage",
    topic: str,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
    timeout: float = 0.5,
) -> Optional[Any]:
    correlation_id = correlation_id or gen_cor_id()

    return await super().request(
        message,
        producer=self._producer,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
        timeout=timeout,
    )

publish_batch async #

publish_batch(*msgs, topic, partition=None, timestamp_ms=None, headers=None, reply_to='', correlation_id=None, no_confirm=False)
Source code in faststream/confluent/broker/broker.py
async def publish_batch(
    self,
    *msgs: "SendableMessage",
    topic: str,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    correlation_id: Optional[str] = None,
    no_confirm: bool = False,
) -> None:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    correlation_id = correlation_id or gen_cor_id()

    call: AsyncFunc = self._producer.publish_batch
    for m in self._middlewares:
        call = partial(m(None).publish_scope, call)

    await call(
        *msgs,
        topic=topic,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        reply_to=reply_to,
        correlation_id=correlation_id,
        no_confirm=no_confirm,
    )

ping async #

ping(timeout)
Source code in faststream/confluent/broker/broker.py
@override
async def ping(self, timeout: Optional[float]) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        if self._producer is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            if await self._producer._producer.ping(timeout=timeout):
                return True

            await anyio.sleep(sleep_time)

    return False