KafkaRouter
faststream.kafka.KafkaRouter #
KafkaRouter(prefix: str = '', handlers: Sequence[KafkaRoute] = (), *, dependencies: Sequence[Depends] = (), middlewares: Sequence[Callable[[ConsumerRecord], BaseMiddleware]] | None = None, parser: CustomParser[ConsumerRecord, KafkaMessage] | None = None, decoder: CustomDecoder[KafkaMessage] | None = None, include_in_schema: bool = True)
Bases: KafkaRouter
A class to represent a Kafka router.
METHOD | DESCRIPTION |
---|---|
_get_publisher_key | Get the key for a publisher |
_update_publisher_prefix | Update the prefix of a publisher |
publisher | Create a new publisher |
Source code in faststream/kafka/router.py
include_router #
include_router(router: BrokerRouter[PublisherKeyType, MsgType]) -> None
Includes a router in the current object.
PARAMETER | DESCRIPTION |
---|---|
router | The router to be included. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/router.py
include_routers #
include_routers(*routers: BrokerRouter[PublisherKeyType, MsgType]) -> None
Includes routers in the object.
PARAMETER | DESCRIPTION |
---|---|
*routers | Variable length argument list of routers to include. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/router.py
publisher #
publisher(topic: str, key: bytes | None = None, partition: int | None = None, timestamp_ms: int | None = None, headers: dict[str, str] | None = None, reply_to: str = '', batch: bool = False, title: str | None = None, description: str | None = None, schema: Any | None = None, include_in_schema: bool = True) -> Publisher
Publishes a message to a topic.
PARAMETER | DESCRIPTION |
---|---|
topic | The topic to publish the message to. TYPE: |
key | The key associated with the message. TYPE: |
partition | The partition to publish the message to. TYPE: |
timestamp_ms | The timestamp of the message in milliseconds. TYPE: |
headers | Additional headers for the message. |
reply_to | The topic to reply to. TYPE: |
batch | Whether to publish the message as part of a batch. TYPE: |
title | The title of the message. TYPE: |
description | The description of the message. TYPE: |
schema | The schema of the message. TYPE: |
include_in_schema | Whether to include the message in the API specification. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Publisher | The publisher object used to publish the message. TYPE: |
Source code in faststream/kafka/router.py
subscriber #
subscriber(*topics: str, group_id: str | None = None, key_deserializer: Callable[[bytes], Any] | None = None, value_deserializer: Callable[[bytes], Any] | None = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 52428800, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5000, check_crcs: bool = True, partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (RoundRobinPartitionAssignor), max_poll_interval_ms: int = 300000, rebalance_timeout_ms: int | None = None, session_timeout_ms: int = 10000, heartbeat_interval_ms: int = 3000, consumer_timeout_ms: int = 200, max_poll_records: int | None = None, exclude_internal_topics: bool = True, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', dependencies: Sequence[Depends] = (), parser: CustomParser[ConsumerRecord, KafkaMessage] | None = None, decoder: CustomDecoder[KafkaMessage] | None = None, middlewares: Sequence[Callable[[ConsumerRecord], BaseMiddleware]] | None = None, filter: Filter[KafkaMessage] = default_filter, batch: bool = False, max_records: int | None = None, batch_timeout_ms: int = 200, retry: bool | int = False, no_ack: bool = False, title: str | None = None, description: str | None = None, include_in_schema: bool = True, **__service_kwargs: Any) -> Callable[[Callable[P_HandlerParams, T_HandlerReturn]], HandlerCallWrapper[ConsumerRecord, P_HandlerParams, T_HandlerReturn]]