Skip to content

KafkaPublisher

faststream.kafka.KafkaPublisher #

KafkaPublisher(
    topic,
    *,
    key=None,
    partition=None,
    headers=None,
    reply_to="",
    batch=False,
    middlewares=(),
    title=None,
    description=None,
    schema=None,
    include_in_schema=True,
)

Bases: ArgsContainer

Delayed KafkaPublisher registration object.

Just a copy of KafkaRegistrator.publisher(...) arguments.

Source code in faststream/kafka/router.py
def __init__(
    self,
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Topic name to send response."),
    ] = "",
    batch: Annotated[
        bool,
        Doc("Whether to send messages in batches or not."),
    ] = False,
    # basic args
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> None:
    super().__init__(
        topic=topic,
        key=key,
        partition=partition,
        batch=batch,
        headers=headers,
        reply_to=reply_to,
        # basic args
        middlewares=middlewares,
        # AsyncAPI args
        title=title,
        description=description,
        schema=schema,
        include_in_schema=include_in_schema,
    )

args instance-attribute #

args = args

kwargs instance-attribute #

kwargs = kwargs