Skip to content

KafkaRouter

faststream.kafka.KafkaRouter #

KafkaRouter(
    prefix: str = "",
    handlers: Sequence[KafkaRoute] = (),
    *,
    dependencies: Sequence[Depends] = (),
    middlewares: Sequence[
        Callable[[ConsumerRecord], BaseMiddleware]
    ]
    | None = None,
    parser: CustomParser[ConsumerRecord, KafkaMessage]
    | None = None,
    decoder: CustomDecoder[KafkaMessage] | None = None,
    include_in_schema: bool = True
)

Bases: KafkaRouter

A class to represent a Kafka router.

METHOD DESCRIPTION
_get_publisher_key

Get the key for a publisher

_update_publisher_prefix

Update the prefix of a publisher

publisher

Create a new publisher

Source code in faststream/kafka/router.py
    Args:
        publisher: The publisher object.

    Returns:
        The publisher key.

    """
    return publisher.topic

@override
@staticmethod
def _update_publisher_prefix(  # type: ignore[override]

include_in_schema instance-attribute #

include_in_schema = include_in_schema

prefix instance-attribute #

prefix: str = prefix

include_router #

include_router(
    router: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes a router in the current object.

PARAMETER DESCRIPTION
router

The router to be included.

TYPE: BrokerRouter[PublisherKeyType, MsgType]

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_router(self, router: "BrokerRouter[PublisherKeyType, MsgType]") -> None:
    """Includes a router in the current object.

    Args:
        router: The router to be included.

    Returns:
        None

    """
    for h in router._handlers:
        self.subscriber(*h.args, **h.kwargs)(h.call)

    for p in router._publishers.values():
        p = self._update_publisher_prefix(self.prefix, p)
        key = self._get_publisher_key(p)
        self._publishers[key] = self._publishers.get(key, p)

include_routers #

include_routers(
    *routers: BrokerRouter[PublisherKeyType, MsgType]
) -> None

Includes routers in the object.

PARAMETER DESCRIPTION
*routers

Variable length argument list of routers to include.

TYPE: BrokerRouter[PublisherKeyType, MsgType] DEFAULT: ()

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/router.py
def include_routers(
    self, *routers: "BrokerRouter[PublisherKeyType, MsgType]"
) -> None:
    """Includes routers in the object.

    Args:
        *routers: Variable length argument list of routers to include.

    Returns:
        None

    """
    for r in routers:
        self.include_router(r)

publisher #

publisher(
    topic: str,
    key: bytes | None = None,
    partition: int | None = None,
    timestamp_ms: int | None = None,
    headers: dict[str, str] | None = None,
    reply_to: str = "",
    batch: bool = False,
    title: str | None = None,
    description: str | None = None,
    schema: Any | None = None,
    include_in_schema: bool = True,
) -> Publisher

Publishes a message to a topic.

PARAMETER DESCRIPTION
topic

The topic to publish the message to.

TYPE: str

key

The key associated with the message.

TYPE: bytes DEFAULT: None

partition

The partition to publish the message to.

TYPE: int DEFAULT: None

timestamp_ms

The timestamp of the message in milliseconds.

TYPE: int DEFAULT: None

headers

Additional headers for the message.

TYPE: Dict[str, str] DEFAULT: None

reply_to

The topic to reply to.

TYPE: str DEFAULT: ''

batch

Whether to publish the message as part of a batch.

TYPE: bool DEFAULT: False

title

The title of the message.

TYPE: str DEFAULT: None

description

The description of the message.

TYPE: str DEFAULT: None

schema

The schema of the message.

TYPE: Any DEFAULT: None

include_in_schema

Whether to include the message in the API specification.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Publisher

The publisher object used to publish the message.

TYPE: Publisher

Source code in faststream/kafka/router.py
@override
def publisher(  # type: ignore[override]
    self,
    topic: str,
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    reply_to: str = "",
    batch: bool = False,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
) -> Publisher:
    """Publishes a message to a topic.

    Args:
        topic (str): The topic to publish the message to.
        key (bytes, optional): The key associated with the message.
        partition (int, optional): The partition to publish the message to.
        timestamp_ms (int, optional): The timestamp of the message in milliseconds.
        headers (Dict[str, str], optional): Additional headers for the message.
        reply_to (str, optional): The topic to reply to.
        batch (bool, optional): Whether to publish the message as part of a batch.
        title (str, optional): The title of the message.
        description (str, optional): The description of the message.
        schema (Any, optional): The schema of the message.
        include_in_schema (bool, optional): Whether to include the message in the API specification.

    Returns:
        Publisher: The publisher object used to publish the message.

    """
    new_publisher = self._update_publisher_prefix(
        self.prefix,
        Publisher(
            topic=topic,
            key=key,
            partition=partition,
            timestamp_ms=timestamp_ms,
            headers=headers,
            reply_to=reply_to,
            title=title,
            batch=batch,
            _description=description,
            _schema=schema,
            include_in_schema=(
                include_in_schema
                if self.include_in_schema is None
                else self.include_in_schema
            ),
        ),
    )
    publisher_key = self._get_publisher_key(new_publisher)
    publisher = self._publishers[publisher_key] = self._publishers.get(
        publisher_key, new_publisher
    )
    return publisher

subscriber #

subscriber(
    *topics: str,
    group_id: str | None = None,
    key_deserializer: Callable[[bytes], Any] | None = None,
    value_deserializer: Callable[[bytes], Any]
    | None = None,
    fetch_max_wait_ms: int = 500,
    fetch_max_bytes: int = 52428800,
    fetch_min_bytes: int = 1,
    max_partition_fetch_bytes: int = 1 * 1024 * 1024,
    auto_offset_reset: Literal[
        "latest", "earliest", "none"
    ] = "latest",
    auto_commit: bool = True,
    auto_commit_interval_ms: int = 5000,
    check_crcs: bool = True,
    partition_assignment_strategy: Sequence[
        AbstractPartitionAssignor
    ] = (RoundRobinPartitionAssignor),
    max_poll_interval_ms: int = 300000,
    rebalance_timeout_ms: int | None = None,
    session_timeout_ms: int = 10000,
    heartbeat_interval_ms: int = 3000,
    consumer_timeout_ms: int = 200,
    max_poll_records: int | None = None,
    exclude_internal_topics: bool = True,
    isolation_level: Literal[
        "read_uncommitted", "read_committed"
    ] = "read_uncommitted",
    dependencies: Sequence[Depends] = (),
    parser: CustomParser[ConsumerRecord, KafkaMessage]
    | None = None,
    decoder: CustomDecoder[KafkaMessage] | None = None,
    middlewares: Sequence[
        Callable[[ConsumerRecord], BaseMiddleware]
    ]
    | None = None,
    filter: Filter[KafkaMessage] = default_filter,
    batch: bool = False,
    max_records: int | None = None,
    batch_timeout_ms: int = 200,
    retry: bool | int = False,
    no_ack: bool = False,
    title: str | None = None,
    description: str | None = None,
    include_in_schema: bool = True,
    **__service_kwargs: Any
) -> Callable[
    [Callable[P_HandlerParams, T_HandlerReturn]],
    HandlerCallWrapper[
        ConsumerRecord, P_HandlerParams, T_HandlerReturn
    ],
]
Source code in faststream/kafka/router.py
    batch: bool = False,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    schema: Optional[Any] = None,
    include_in_schema: bool = True,
) -> Publisher:
    """Publishes a message to a topic.

    Args:
        topic (str): The topic to publish the message to.
        key (bytes, optional): The key associated with the message.
        partition (int, optional): The partition to publish the message to.
        timestamp_ms (int, optional): The timestamp of the message in milliseconds.
        headers (Dict[str, str], optional): Additional headers for the message.
        reply_to (str, optional): The topic to reply to.
        batch (bool, optional): Whether to publish the message as part of a batch.
        title (str, optional): The title of the message.
        description (str, optional): The description of the message.
        schema (Any, optional): The schema of the message.
        include_in_schema (bool, optional): Whether to include the message in the API specification.

    Returns:
        Publisher: The publisher object used to publish the message.

    """
    new_publisher = self._update_publisher_prefix(
        self.prefix,
        Publisher(
            topic=topic,
            key=key,
            partition=partition,
            timestamp_ms=timestamp_ms,
            headers=headers,
            reply_to=reply_to,
            title=title,
            batch=batch,
            _description=description,
            _schema=schema,
            include_in_schema=(
                include_in_schema
                if self.include_in_schema is None
                else self.include_in_schema
            ),
        ),
    )
    publisher_key = self._get_publisher_key(new_publisher)
    publisher = self._publishers[publisher_key] = self._publishers.get(
        publisher_key, new_publisher
    )
    return publisher