Skip to content

LogicPublisher

faststream.confluent.publisher.LogicPublisher dataclass #

LogicPublisher(include_in_schema: bool = True, title: Optional[str] = None, _description: Optional[str] = None, _schema: Optional[Any] = None, _fake_handler: bool = False, topic: str = '', key: Optional[bytes] = None, partition: Optional[int] = None, timestamp_ms: Optional[int] = None, headers: Optional[Dict[str, str]] = None, reply_to: Optional[str] = '', batch: bool = False, client_id: str = 'faststream-' + __version__)

Bases: ABCPublisher[Message]

A class to publish messages to a Kafka topic.

METHOD DESCRIPTION
publish

Publishes messages to the Kafka topic

RAISES DESCRIPTION
AssertionError

If _producer is not set up or if multiple messages are sent without the batch flag

batch class-attribute instance-attribute #

batch: bool = field(default=False)

calls class-attribute instance-attribute #

calls: List[Callable[..., Any]] = field(init=False, default_factory=list, repr=False)

client_id class-attribute instance-attribute #

client_id: str = field(default='faststream-' + __version__)

description property #

description: Optional[str]

headers class-attribute instance-attribute #

headers: Optional[Dict[str, str]] = None

include_in_schema class-attribute instance-attribute #

include_in_schema: bool = field(default=True)

key class-attribute instance-attribute #

key: Optional[bytes] = None

mock class-attribute instance-attribute #

mock: Optional[MagicMock] = field(init=False, default=None, repr=False)

partition class-attribute instance-attribute #

partition: Optional[int] = None

reply_to class-attribute instance-attribute #

reply_to: Optional[str] = ''

timestamp_ms class-attribute instance-attribute #

timestamp_ms: Optional[int] = None

title class-attribute instance-attribute #

title: Optional[str] = field(default=None)

topic class-attribute instance-attribute #

topic: str = ''

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self._schema:
        params = {"response__": (self._schema, ...)}

        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(  # type: ignore[call-overload]
                "",
                __config__=get_config_base(),  # type: ignore[arg-type]
                **params,  # type: ignore[arg-type]
            ),
            params=params,
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

name #

name() -> str

Returns the name of the API operation.

Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    """Returns the name of the API operation."""
    raise NotImplementedError()

publish async #

publish(*messages: SendableMessage, message: SendableMessage = '', key: Optional[bytes] = None, partition: Optional[int] = None, timestamp_ms: Optional[int] = None, headers: Optional[Dict[str, str]] = None, correlation_id: Optional[str] = None) -> None

Publish messages to a topic.

PARAMETER DESCRIPTION
*messages

Variable length argument list of SendableMessage objects.

TYPE: SendableMessage DEFAULT: ()

message

A SendableMessage object. Default is an empty string.

TYPE: SendableMessage DEFAULT: ''

key

Optional bytes object representing the message key.

TYPE: Optional[bytes] DEFAULT: None

partition

Optional integer representing the partition to publish the message to.

TYPE: Optional[int] DEFAULT: None

timestamp_ms

Optional integer representing the timestamp of the message in milliseconds.

TYPE: Optional[int] DEFAULT: None

headers

Optional dictionary of header key-value pairs.

TYPE: Optional[Dict[str, str]] DEFAULT: None

correlation_id

Optional string representing the correlation ID of the message.

TYPE: Optional[str] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
AssertionError

If _producer is not set up.

AssertionError

If batch flag is not set and there are multiple messages.

ValueError

If message is not a sequence when messages is empty.

Source code in faststream/confluent/publisher.py
@override
async def publish(  # type: ignore[override]
    self,
    *messages: SendableMessage,
    message: SendableMessage = "",
    key: Optional[bytes] = None,
    partition: Optional[int] = None,
    timestamp_ms: Optional[int] = None,
    headers: Optional[Dict[str, str]] = None,
    correlation_id: Optional[str] = None,
) -> None:
    """Publish messages to a topic.

    Args:
        *messages: Variable length argument list of SendableMessage objects.
        message: A SendableMessage object. Default is an empty string.
        key: Optional bytes object representing the message key.
        partition: Optional integer representing the partition to publish the message to.
        timestamp_ms: Optional integer representing the timestamp of the message in milliseconds.
        headers: Optional dictionary of header key-value pairs.
        correlation_id: Optional string representing the correlation ID of the message.

    Returns:
        None

    Raises:
        AssertionError: If `_producer` is not set up.
        AssertionError: If `batch` flag is not set and there are multiple messages.
        ValueError: If `message` is not a sequence when `messages` is empty.

    """
    assert self._producer, NOT_CONNECTED_YET  # nosec B101
    assert (  # nosec B101
        self.batch or len(messages) < 2
    ), "You can't send multiple messages without `batch` flag"
    assert self.topic, "You have to specify outgoing topic"  # nosec B101

    if not self.batch:
        return await self._producer.publish(
            message=next(iter(messages), message),
            topic=self.topic,
            key=key or self.key,
            partition=partition or self.partition,
            timestamp_ms=timestamp_ms or self.timestamp_ms,
            correlation_id=correlation_id,
            headers=headers or self.headers,
            reply_to=self.reply_to or "",
        )
    else:
        to_send: Sequence[SendableMessage]
        if not messages:
            if not isinstance(message, Sequence):
                raise ValueError(
                    f"Message: {message} should be Sequence type to send in batch"
                )
            else:
                to_send = message
        else:
            to_send = messages

        await self._producer.publish_batch(
            *to_send,
            topic=self.topic,
            partition=partition or self.partition,
            timestamp_ms=timestamp_ms or self.timestamp_ms,
            headers=headers or self.headers,
        )
        return None

reset_test #

reset_test() -> None
Source code in faststream/broker/publisher.py
def reset_test(self) -> None:
    self._fake_handler = False
    self.mock = None

schema #

schema() -> Dict[str, Channel]

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    return {}

set_test #

set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
def set_test(
    self,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    self.mock = mock
    self._fake_handler = with_fake