Skip to content

KafkaBroker

faststream.confluent.KafkaBroker #

KafkaBroker(bootstrap_servers: str | Iterable[str] = 'localhost', *, protocol: str = None, protocol_version: str = 'auto', client_id: str = 'faststream-' + __version__, security: BaseSecurity | None = None, **kwargs: Any)

Bases: KafkaLoggingMixin, BrokerAsyncUsecase[Message, ConsumerConnectionParams]

KafkaBroker is a class for managing Kafka message consumption and publishing.

It extends BrokerAsyncUsecase to handle asynchronous operations.

PARAMETER DESCRIPTION
bootstrap_servers

Kafka bootstrap server(s).

TYPE: Union[str, Iterable[str]] DEFAULT: 'localhost'

protocol

The protocol used (default is "kafka").

TYPE: str DEFAULT: None

protocol_version

The Kafka protocol version (default is "auto").

TYPE: str DEFAULT: 'auto'

client_id

The client ID for the Kafka client.

TYPE: str DEFAULT: 'faststream-' + __version__

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

METHOD DESCRIPTION
connect

Establishes a connection to Kafka.

start

Starts the KafkaBroker and message handlers.

publish

Publishes a message to Kafka.

Initialize a KafkaBroker instance.

PARAMETER DESCRIPTION
bootstrap_servers

Kafka bootstrap server(s).

TYPE: Union[str, Iterable[str]] DEFAULT: 'localhost'

protocol

The protocol used (default is "kafka").

TYPE: str DEFAULT: None

protocol_version

The Kafka protocol version (default is "auto").

TYPE: str DEFAULT: 'auto'

client_id

The client ID for the Kafka client.

TYPE: str DEFAULT: 'faststream-' + __version__

security

Security protocol to use in communication with the broker (default is None).

TYPE: Optional[BaseSecurity] DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

Source code in faststream/confluent/broker.py
def __init__(
    self,
    bootstrap_servers: Union[str, Iterable[str]] = "localhost",
    *,
    protocol: Optional[str] = None,
    protocol_version: str = "auto",
    client_id: str = "faststream-" + __version__,
    security: Optional[BaseSecurity] = None,
    **kwargs: Any,
) -> None:
    """Initialize a KafkaBroker instance.

    Args:
        bootstrap_servers (Union[str, Iterable[str]]): Kafka bootstrap server(s).
        protocol (str): The protocol used (default is "kafka").
        protocol_version (str): The Kafka protocol version (default is "auto").
        client_id (str): The client ID for the Kafka client.
        security (Optional[BaseSecurity]): Security protocol to use in communication with the broker (default is None).
        **kwargs: Additional keyword arguments.
    """
    if protocol is None:
        if security is not None and security.use_ssl:
            protocol = "kafka-secure"
        else:
            protocol = "kafka"

    super().__init__(
        url=[bootstrap_servers]
        if isinstance(bootstrap_servers, str)
        else list(bootstrap_servers),
        protocol=protocol,
        protocol_version=protocol_version,
        security=security,
        **kwargs,
        client_id=client_id,
        bootstrap_servers=bootstrap_servers,
    )
    self.client_id = client_id
    self._producer = None

client_id instance-attribute #

client_id = client_id

dependencies instance-attribute #

dependencies: Sequence[Depends] = dependencies

description instance-attribute #

description = description

fmt property #

fmt: str

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

handlers instance-attribute #

handlers: dict[str, Handler]

log_level instance-attribute #

log_level: int

logger instance-attribute #

logger: Optional[Logger]

middlewares instance-attribute #

middlewares: Sequence[Callable[[MsgType], BaseMiddleware]]

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

security instance-attribute #

security = security

started instance-attribute #

started: bool = False

tags instance-attribute #

tags = tags

url instance-attribute #

url: List[str]

close async #

close(exc_type: Optional[Type[BaseException]] = None, exc_val: Optional[BaseException] = None, exec_tb: Optional[TracebackType] = None) -> None

Closes the object.

PARAMETER DESCRIPTION
exc_type

The type of the exception being handled, if any.

TYPE: Optional[Type[BaseException]] DEFAULT: None

exc_val

The exception instance being handled, if any.

TYPE: Optional[BaseException] DEFAULT: None

exec_tb

The traceback of the exception being handled, if any.

TYPE: Optional[TracebackType] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/core/asynchronous.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exec_tb: Optional[TracebackType] = None,
) -> None:
    """Closes the object.

    Args:
        exc_type: The type of the exception being handled, if any.
        exc_val: The exception instance being handled, if any.
        exec_tb: The traceback of the exception being handled, if any.

    Returns:
        None

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    super()._abc_close(exc_type, exc_val, exec_tb)

    for h in self.handlers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exec_tb)

connect async #

connect(*args: Any, **kwargs: Any) -> ConsumerConnectionParams

Establishes a connection to Kafka and returns connection parameters.

PARAMETER DESCRIPTION
*args

Additional arguments.

TYPE: Any DEFAULT: ()

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
ConsumerConnectionParams

The connection parameters.

TYPE: ConsumerConnectionParams

Source code in faststream/confluent/broker.py
async def connect(
    self,
    *args: Any,
    **kwargs: Any,
) -> ConsumerConnectionParams:
    """Establishes a connection to Kafka and returns connection parameters.

    Args:
        *args: Additional arguments.
        **kwargs: Additional keyword arguments.

    Returns:
        ConsumerConnectionParams: The connection parameters.
    """
    connection = await super().connect(*args, **kwargs)
    for p in self._publishers.values():
        p._producer = self._producer
    return connection

include_router #

include_router(router: BrokerRouter[Any, MsgType]) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[Any, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_router(self, router: BrokerRouter[Any, MsgType]) -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for r in router._handlers:
        self.subscriber(*r.args, **r.kwargs)(r.call)

    self._publishers = {**self._publishers, **router._publishers}

include_routers #

include_routers(*routers: BrokerRouter[Any, MsgType]) -> None

Includes routers in the current object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[Any, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/core/abc.py
def include_routers(self, *routers: BrokerRouter[Any, MsgType]) -> None:
    """Includes routers in the current object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publish async #

publish(*args: Any, **kwargs: Any) -> None

Publish a message to Kafka.

PARAMETER DESCRIPTION
*args

Positional arguments for message publishing.

TYPE: Any DEFAULT: ()

**kwargs

Keyword arguments for message publishing.

TYPE: Any DEFAULT: {}

RAISES DESCRIPTION
RuntimeError

If KafkaBroker is not started yet.

Source code in faststream/confluent/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Publish a message to Kafka.

    Args:
        *args: Positional arguments for message publishing.
        **kwargs: Keyword arguments for message publishing.

    Raises:
        RuntimeError: If KafkaBroker is not started yet.
    """
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    return await self._producer.publish(*args, **kwargs)

publish_batch async #

publish_batch(*args: Any, **kwargs: Any) -> None

Publish a batch of messages to Kafka.

PARAMETER DESCRIPTION
*args

Positional arguments for message publishing.

TYPE: Any DEFAULT: ()

**kwargs

Keyword arguments for message publishing.

TYPE: Any DEFAULT: {}

RAISES DESCRIPTION
RuntimeError

If KafkaBroker is not started yet.

Source code in faststream/confluent/broker.py
async def publish_batch(
    self,
    *args: Any,
    **kwargs: Any,
) -> None:
    """Publish a batch of messages to Kafka.

    Args:
        *args: Positional arguments for message publishing.
        **kwargs: Keyword arguments for message publishing.

    Raises:
        RuntimeError: If KafkaBroker is not started yet.
    """
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    await self._producer.publish_batch(*args, **kwargs)

publisher #

publisher(topic: str, key: bytes | None = None, partition: int | None = None, timestamp_ms: int | None = None, headers: dict[str, str] | None = None, reply_to: str = '', batch: bool = False, title: str | None = None, description: str | None = None, schema: Any | None = None, include_in_schema: bool = True) -> Publisher

Create a message publisher for the specified topic.

PARAMETER DESCRIPTION
topic

The topic to publish messages to.

TYPE: str

key

Message key.

TYPE: Optional[bytes] DEFAULT: None

partition

Partition to send the message to.

TYPE: Optional[int] DEFAULT: None

timestamp_ms

Message timestamp in milliseconds.

TYPE: Optional[int] DEFAULT: None

headers

Message headers.

TYPE: Optional[Dict[str, str]] DEFAULT: None

reply_to

The topic to which responses should be sent.

TYPE: str DEFAULT: ''

batch

Whether to publish messages in batches.

TYPE: bool DEFAULT: False

title

AsyncAPI title.

TYPE: Optional[str] DEFAULT: None

description

AsyncAPI description.

TYPE: Optional[str] DEFAULT: None

schema

AsyncAPI schema.

TYPE: Optional[Any] DEFAULT: None

include_in_schema

Whether to include the publisher in the AsyncAPI schema.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Publisher

A message publisher.

TYPE: Publisher

Source code in faststream/confluent/broker.py
@override
def publisher(  # type: ignore[override]
    self,
    topic: str,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    batch: bool = False,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
) -> Publisher:
    """Create a message publisher for the specified topic.

    Args:
        topic (str): The topic to publish messages to.
        key (Optional[bytes]): Message key.
        partition (Optional[int]): Partition to send the message to.
        timestamp_ms (Optional[int]): Message timestamp in milliseconds.
        headers (Optional[Dict[str, str]]): Message headers.
        reply_to (str): The topic to which responses should be sent.
        batch (bool): Whether to publish messages in batches.
        title (Optional[str]): AsyncAPI title.
        description (Optional[str]): AsyncAPI description.
        schema (Optional[Any]): AsyncAPI schema.
        include_in_schema (bool): Whether to include the publisher in the AsyncAPI schema.

    Returns:
        Publisher: A message publisher.
    """
    publisher = self._publishers.get(
        topic,
        Publisher(
            topic=topic,
            client_id=self.client_id,
            key=key,
            batch=batch,
            partition=partition,
            timestamp_ms=timestamp_ms,
            headers=headers,
            reply_to=reply_to,
            title=title,
            _description=description,
            _schema=schema,
            include_in_schema=include_in_schema,
        ),
    )
    super().publisher(topic, publisher)
    if self._producer is not None:
        publisher._producer = self._producer
    return publisher

start async #

start() -> None

Start the KafkaBroker and message handlers.

Source code in faststream/confluent/broker.py
async def start(self) -> None:
    """Start the KafkaBroker and message handlers."""
    context.set_global(
        "default_log_context",
        self._get_log_context(None, ""),
    )

    await super().start()

    for handler in self.handlers.values():
        c = self._get_log_context(None, handler.topics, handler.group_id)
        self._log(f"`{handler.call_name}` waiting for messages", extra=c)
        await handler.start(**(self._connection or {}))

subscriber #

subscriber(*topics: str, group_id: Optional[str] = None, key_deserializer: Optional[Callable[[bytes], Any]] = None, value_deserializer: Optional[Callable[[bytes], Any]] = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 52428800, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5000, check_crcs: bool = True, partition_assignment_strategy: Sequence[str] = ('roundrobin'), max_poll_interval_ms: int = 300000, rebalance_timeout_ms: Optional[int] = None, session_timeout_ms: int = 10000, heartbeat_interval_ms: int = 3000, consumer_timeout_ms: int = 200, max_poll_records: Optional[int] = None, exclude_internal_topics: bool = True, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', dependencies: Sequence[Depends] = (), parser: Optional[Union[CustomParser[Message, KafkaMessage], CustomParser[Tuple[Message, ...], KafkaMessage]]] = None, decoder: Optional[CustomDecoder] = None, middlewares: Optional[Sequence[Callable[[Message], BaseMiddleware]]] = None, filter: Union[Filter[KafkaMessage], Filter[StreamMessage[Tuple[Message, ...]]]] = default_filter, batch: bool = False, max_records: Optional[int] = None, batch_timeout_ms: int = 200, no_ack: bool = False, title: Optional[str] = None, description: Optional[str] = None, include_in_schema: bool = True, **original_kwargs: Any) -> Callable[[Callable[P_HandlerParams, T_HandlerReturn]], Union[HandlerCallWrapper[Message, P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[Tuple[Message, ...], P_HandlerParams, T_HandlerReturn]]]

Create a message subscriber for the specified topics.

PARAMETER DESCRIPTION
*topics

The topics to subscribe to.

TYPE: str DEFAULT: ()

group_id

The Kafka consumer group ID.

TYPE: Optional[str] DEFAULT: None

key_deserializer

Key deserializer function.

TYPE: Optional[Callable[[bytes], Any]] DEFAULT: None

value_deserializer

Value deserializer function.

TYPE: Optional[Callable[[bytes], Any]] DEFAULT: None

fetch_max_wait_ms

The maximum time to wait for data.

TYPE: int DEFAULT: 500

fetch_max_bytes

The maximum number of bytes to fetch.

TYPE: int DEFAULT: 52428800

fetch_min_bytes

The minimum number of bytes to fetch.

TYPE: int DEFAULT: 1

max_partition_fetch_bytes

The maximum bytes to fetch for a partition.

TYPE: int DEFAULT: 1 * 1024 * 1024

auto_offset_reset

Auto offset reset policy.

TYPE: Literal['latest', 'earliest', 'none'] DEFAULT: 'latest'

auto_commit

Whether to enable auto-commit.

TYPE: bool DEFAULT: True

auto_commit_interval_ms

Auto-commit interval in milliseconds.

TYPE: int DEFAULT: 5000

check_crcs

Whether to check CRCs.

TYPE: bool DEFAULT: True

partition_assignment_strategy

Partition assignment strategy.

TYPE: Sequence[AbstractPartitionAssignor] DEFAULT: ('roundrobin')

max_poll_interval_ms

Maximum poll interval in milliseconds.

TYPE: int DEFAULT: 300000

rebalance_timeout_ms

Rebalance timeout in milliseconds.

TYPE: Optional[int] DEFAULT: None

session_timeout_ms

Session timeout in milliseconds.

TYPE: int DEFAULT: 10000

heartbeat_interval_ms

Heartbeat interval in milliseconds.

TYPE: int DEFAULT: 3000

consumer_timeout_ms

Consumer timeout in milliseconds.

TYPE: int DEFAULT: 200

max_poll_records

Maximum number of records to poll.

TYPE: Optional[int] DEFAULT: None

exclude_internal_topics

Whether to exclude internal topics.

TYPE: bool DEFAULT: True

isolation_level

Isolation level.

TYPE: Literal['read_uncommitted', 'read_committed'] DEFAULT: 'read_uncommitted'

dependencies

Additional dependencies for message handling.

TYPE: Sequence[Depends] DEFAULT: ()

parser

Message parser.

TYPE: Optional[Union[CustomParser[Message], CustomParser[Tuple[Message, ...]]]] DEFAULT: None

decoder

Message decoder.

TYPE: Optional[CustomDecoder] DEFAULT: None

middlewares

Message middlewares.

TYPE: Optional[Sequence[Callable[[Message], BaseMiddleware]]] DEFAULT: None

filter

Message filter.

TYPE: Union[Filter[KafkaMessage], Filter[StreamMessage[Tuple[Message, ...]]]] DEFAULT: default_filter

batch

Whether to process messages in batches.

TYPE: bool DEFAULT: False

max_records

Maximum number of records to process in each batch.

TYPE: Optional[int] DEFAULT: None

batch_timeout_ms

Batch timeout in milliseconds.

TYPE: int DEFAULT: 200

no_ack

Whether not to ack/nack/reject messages.

TYPE: bool DEFAULT: False

title

AsyncAPI title.

TYPE: Optional[str] DEFAULT: None

description

AsyncAPI description.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Whether to include the handler in the AsyncAPI schema.

TYPE: bool DEFAULT: True

**original_kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Callable

A decorator that wraps a message handler function.

TYPE: Callable[[Callable[P_HandlerParams, T_HandlerReturn]], Union[HandlerCallWrapper[Message, P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[Tuple[Message, ...], P_HandlerParams, T_HandlerReturn]]]

Source code in faststream/confluent/broker.py
@override
def subscriber(  # type: ignore[override]
    self,
    *topics: str,
    group_id: Optional[str] = None,
    key_deserializer: Optional[Callable[[bytes], Any]] = None,
    value_deserializer: Optional[Callable[[bytes], Any]] = None,
    fetch_max_wait_ms: int = 500,
    fetch_max_bytes: int = 52428800,
    fetch_min_bytes: int = 1,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal[
        "latest",
        "earliest",
        "none",
    ] = "latest",
    auto_commit: bool = True,
    auto_commit_interval_ms: int = 5000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence[str] = ("roundrobin",),
    max_poll_interval_ms: int = 300000,
    rebalance_timeout_ms: Optional[int] = None,
    session_timeout_ms: int = 10000,
    heartbeat_interval_ms: int = 3000,
    consumer_timeout_ms: int = 200,
    max_poll_records: Optional[int] = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted",
        "read_committed",
    ] = "read_uncommitted",
    # broker arguments
    dependencies: Sequence[Depends] = (),
    parser: Optional[
        Union[
            CustomParser[confluent_kafka.Message, KafkaMessage],
            CustomParser[Tuple[confluent_kafka.Message, ...], KafkaMessage],
        ]
    ] = None,
    decoder: Optional[CustomDecoder] = None,
    middlewares: Optional[
        Sequence[
            Callable[
                [confluent_kafka.Message],
                BaseMiddleware,
            ]
        ]
    ] = None,
    filter: Union[
        Filter[KafkaMessage],
        Filter[StreamMessage[Tuple[confluent_kafka.Message, ...]]],
    ] = default_filter,
    batch: bool = False,
    max_records: Optional[int] = None,
    batch_timeout_ms: int = 200,
    no_ack: bool = False,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    include_in_schema: bool = True,
    **original_kwargs: Any,
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    Union[
        HandlerCallWrapper[
            confluent_kafka.Message, P_HandlerParams, T_HandlerReturn
        ],
        HandlerCallWrapper[
            Tuple[confluent_kafka.Message, ...], P_HandlerParams, T_HandlerReturn
        ],
    ],
]:
    """Create a message subscriber for the specified topics.

    Args:
        *topics (str): The topics to subscribe to.
        group_id (Optional[str]): The Kafka consumer group ID.
        key_deserializer (Optional[Callable[[bytes], Any]]): Key deserializer function.
        value_deserializer (Optional[Callable[[bytes], Any]]): Value deserializer function.
        fetch_max_wait_ms (int): The maximum time to wait for data.
        fetch_max_bytes (int): The maximum number of bytes to fetch.
        fetch_min_bytes (int): The minimum number of bytes to fetch.
        max_partition_fetch_bytes (int): The maximum bytes to fetch for a partition.
        auto_offset_reset (Literal["latest", "earliest", "none"]): Auto offset reset policy.
        auto_commit (bool): Whether to enable auto-commit.
        auto_commit_interval_ms (int): Auto-commit interval in milliseconds.
        check_crcs (bool): Whether to check CRCs.
        partition_assignment_strategy (Sequence[AbstractPartitionAssignor]): Partition assignment strategy.
        max_poll_interval_ms (int): Maximum poll interval in milliseconds.
        rebalance_timeout_ms (Optional[int]): Rebalance timeout in milliseconds.
        session_timeout_ms (int): Session timeout in milliseconds.
        heartbeat_interval_ms (int): Heartbeat interval in milliseconds.
        consumer_timeout_ms (int): Consumer timeout in milliseconds.
        max_poll_records (Optional[int]): Maximum number of records to poll.
        exclude_internal_topics (bool): Whether to exclude internal topics.
        isolation_level (Literal["read_uncommitted", "read_committed"]): Isolation level.
        dependencies (Sequence[Depends]): Additional dependencies for message handling.
        parser (Optional[Union[CustomParser[confluent_kafka.Message], CustomParser[Tuple[confluent_kafka.Message, ...]]]]): Message parser.
        decoder (Optional[CustomDecoder]): Message decoder.
        middlewares (Optional[Sequence[Callable[[confluent_kafka.Message], BaseMiddleware]]]): Message middlewares.
        filter (Union[Filter[KafkaMessage], Filter[StreamMessage[Tuple[confluent_kafka.Message, ...]]]]): Message filter.
        batch (bool): Whether to process messages in batches.
        max_records (Optional[int]): Maximum number of records to process in each batch.
        batch_timeout_ms (int): Batch timeout in milliseconds.
        no_ack (bool): Whether not to ack/nack/reject messages.
        title (Optional[str]): AsyncAPI title.
        description (Optional[str]): AsyncAPI description.
        include_in_schema (bool): Whether to include the handler in the AsyncAPI schema.
        **original_kwargs: Additional keyword arguments.

    Returns:
        Callable: A decorator that wraps a message handler function.
    """
    super().subscriber()

    self._setup_log_context(topics, group_id)

    if not auto_commit and not group_id:
        raise ValueError("You should install `group_id` with manual commit mode")

    key = Handler.get_routing_hash(topics, group_id)
    builder = partial(
        AsyncConfluentConsumer,
        key_deserializer=key_deserializer,
        value_deserializer=value_deserializer,
        fetch_max_wait_ms=fetch_max_wait_ms,
        fetch_max_bytes=fetch_max_bytes,
        fetch_min_bytes=fetch_min_bytes,
        max_partition_fetch_bytes=max_partition_fetch_bytes,
        auto_offset_reset=auto_offset_reset,
        enable_auto_commit=auto_commit,
        auto_commit_interval_ms=auto_commit_interval_ms,
        check_crcs=check_crcs,
        partition_assignment_strategy=partition_assignment_strategy,
        max_poll_interval_ms=max_poll_interval_ms,
        rebalance_timeout_ms=rebalance_timeout_ms,
        session_timeout_ms=session_timeout_ms,
        heartbeat_interval_ms=heartbeat_interval_ms,
        consumer_timeout_ms=consumer_timeout_ms,
        max_poll_records=max_poll_records,
        exclude_internal_topics=exclude_internal_topics,
        isolation_level=isolation_level,
    )
    handler = self.handlers.get(
        key,
        Handler(
            *topics,
            log_context_builder=partial(
                self._get_log_context,
                topics=topics,
                group_id=group_id,
            ),
            is_manual=not auto_commit,
            group_id=group_id,
            client_id=self.client_id,
            builder=builder,
            description=description,
            title=title,
            batch=batch,
            batch_timeout_ms=batch_timeout_ms,
            max_records=max_records,
            include_in_schema=include_in_schema,
        ),
    )

    self.handlers[key] = handler

    def consumer_wrapper(
        func: Callable[P_HandlerParams, T_HandlerReturn],
    ) -> HandlerCallWrapper[
        confluent_kafka.Message, P_HandlerParams, T_HandlerReturn
    ]:
        """A wrapper function for a consumer handler.

        Args:
            func : The consumer handler function to be wrapped.

        Returns:
            The wrapped handler call.

        Raises:
            NotImplementedError: If silent animals are not supported.
        !!! note

            The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
        """
        handler_call, dependant = self._wrap_handler(
            func=func,
            extra_dependencies=dependencies,
            no_ack=no_ack,
            **original_kwargs,
        )

        handler.add_call(
            handler=handler_call,
            filter=filter,
            middlewares=middlewares,
            parser=parser or self._global_parser,
            decoder=decoder or self._global_decoder,
            dependant=dependant,
        )

        return handler_call

    return consumer_wrapper