KafkaRouter
faststream.kafka.KafkaRouter #
KafkaRouter(
prefix: str = "",
handlers: Sequence[KafkaRoute] = (),
*,
dependencies: Sequence[Depends] = (),
middlewares: Sequence[
Callable[[ConsumerRecord], BaseMiddleware]
]
| None = None,
parser: CustomParser[ConsumerRecord, KafkaMessage]
| None = None,
decoder: CustomDecoder[KafkaMessage] | None = None,
include_in_schema: bool = True
)
Bases: KafkaRouter
A class to represent a Kafka router.
METHOD | DESCRIPTION |
---|---|
_get_publisher_key | Get the key for a publisher |
_update_publisher_prefix | Update the prefix of a publisher |
publisher | Create a new publisher |
Source code in faststream/kafka/router.py
include_router #
include_router(
router: BrokerRouter[PublisherKeyType, MsgType]
) -> None
Includes a router in the current object.
PARAMETER | DESCRIPTION |
---|---|
router | The router to be included. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/router.py
include_routers #
include_routers(
*routers: BrokerRouter[PublisherKeyType, MsgType]
) -> None
Includes routers in the object.
PARAMETER | DESCRIPTION |
---|---|
*routers | Variable length argument list of routers to include. TYPE: |
RETURNS | DESCRIPTION |
---|---|
None | None |
Source code in faststream/broker/router.py
publisher #
publisher(
topic: str,
key: bytes | None = None,
partition: int | None = None,
timestamp_ms: int | None = None,
headers: dict[str, str] | None = None,
reply_to: str = "",
batch: bool = False,
title: str | None = None,
description: str | None = None,
schema: Any | None = None,
include_in_schema: bool = True,
) -> Publisher
Publishes a message to a topic.
PARAMETER | DESCRIPTION |
---|---|
topic | The topic to publish the message to. TYPE: |
key | The key associated with the message. TYPE: |
partition | The partition to publish the message to. TYPE: |
timestamp_ms | The timestamp of the message in milliseconds. TYPE: |
headers | Additional headers for the message. |
reply_to | The topic to reply to. TYPE: |
batch | Whether to publish the message as part of a batch. TYPE: |
title | The title of the message. TYPE: |
description | The description of the message. TYPE: |
schema | The schema of the message. TYPE: |
include_in_schema | Whether to include the message in the API specification. TYPE: |
RETURNS | DESCRIPTION |
---|---|
Publisher | The publisher object used to publish the message. TYPE: |
Source code in faststream/kafka/router.py
subscriber #
subscriber(
*topics: str,
group_id: str | None = None,
key_deserializer: Callable[[bytes], Any] | None = None,
value_deserializer: Callable[[bytes], Any]
| None = None,
fetch_max_wait_ms: int = 500,
fetch_max_bytes: int = 52428800,
fetch_min_bytes: int = 1,
max_partition_fetch_bytes: int = 1 * 1024 * 1024,
auto_offset_reset: Literal[
"latest", "earliest", "none"
] = "latest",
auto_commit: bool = True,
auto_commit_interval_ms: int = 5000,
check_crcs: bool = True,
partition_assignment_strategy: Sequence[
AbstractPartitionAssignor
] = (RoundRobinPartitionAssignor),
max_poll_interval_ms: int = 300000,
rebalance_timeout_ms: int | None = None,
session_timeout_ms: int = 10000,
heartbeat_interval_ms: int = 3000,
consumer_timeout_ms: int = 200,
max_poll_records: int | None = None,
exclude_internal_topics: bool = True,
isolation_level: Literal[
"read_uncommitted", "read_committed"
] = "read_uncommitted",
dependencies: Sequence[Depends] = (),
parser: CustomParser[ConsumerRecord, KafkaMessage]
| None = None,
decoder: CustomDecoder[KafkaMessage] | None = None,
middlewares: Sequence[
Callable[[ConsumerRecord], BaseMiddleware]
]
| None = None,
filter: Filter[KafkaMessage] = default_filter,
batch: bool = False,
max_records: int | None = None,
batch_timeout_ms: int = 200,
retry: bool | int = False,
no_ack: bool = False,
title: str | None = None,
description: str | None = None,
include_in_schema: bool = True,
**__service_kwargs: Any
) -> Callable[
[Callable[P_HandlerParams, T_HandlerReturn]],
HandlerCallWrapper[
ConsumerRecord, P_HandlerParams, T_HandlerReturn
],
]