Skip to content

BasePublisher

faststream.broker.publisher.BasePublisher dataclass #

Bases: AsyncAPIOperation, Generic[MsgType]

A base class for publishers in an asynchronous API.

METHOD DESCRIPTION
description

returns the description of the publisher

__call__

decorator to register a function as a handler for the publisher

publish

publishes a message with optional correlation ID

RAISES DESCRIPTION
NotImplementedError

if the publish method is not implemented.

calls class-attribute instance-attribute #

calls: List[Callable[..., Any]] = field(
    init=False, default_factory=list, repr=False
)

description property #

description: Optional[str]

include_in_schema class-attribute instance-attribute #

include_in_schema: bool = field(default=True)

mock class-attribute instance-attribute #

mock: Optional[MagicMock] = field(
    init=False, default=None, repr=False
)

title class-attribute instance-attribute #

title: Optional[str] = field(default=None)

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self._schema:
        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(
                "",
                __config__=get_config_base(),  # type: ignore[arg-type]
                response__=(self._schema, ...),
            ),
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

name #

name() -> str

Returns the name of the API operation.

Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    """Returns the name of the API operation."""
    raise NotImplementedError()

publish abstractmethod async #

publish(
    message: SendableMessage,
    correlation_id: Optional[str] = None,
    **kwargs: Any
) -> Optional[SendableMessage]

Publish a message.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: SendableMessage

correlation_id

Optional correlation ID for the message.

TYPE: Optional[str] DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Optional[SendableMessage]

The published message.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/publisher.py
@abstractmethod
async def publish(
    self,
    message: SendableMessage,
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Optional[SendableMessage]:
    """Publish a message.

    Args:
        message: The message to be published.
        correlation_id: Optional correlation ID for the message.
        **kwargs: Additional keyword arguments.

    Returns:
        The published message.

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    raise NotImplementedError()

reset_test #

reset_test() -> None
Source code in faststream/broker/publisher.py
def reset_test(self) -> None:
    self._fake_handler = False
    self.mock = None

schema #

schema() -> Dict[str, Channel]

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    return {}

set_test #

set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
def set_test(
    self,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    self.mock = mock
    self._fake_handler = with_fake