Skip to content

BasePublisher

faststream.broker.publisher.BasePublisher dataclass #

BasePublisher(include_in_schema: bool = True, title: Optional[str] = None, _description: Optional[str] = None, _schema: Optional[Any] = None, _fake_handler: bool = False)

Bases: AsyncAPIOperation, Generic[MsgType]

A base class for publishers in an asynchronous API.

METHOD DESCRIPTION
description

returns the description of the publisher

__call__

decorator to register a function as a handler for the publisher

publish

publishes a message with optional correlation ID

RAISES DESCRIPTION
NotImplementedError

if the publish method is not implemented.

calls class-attribute instance-attribute #

calls: List[Callable[..., Any]] = field(init=False, default_factory=list, repr=False)

description property #

description: Optional[str]

include_in_schema class-attribute instance-attribute #

include_in_schema: bool = field(default=True)

mock class-attribute instance-attribute #

mock: Optional[MagicMock] = field(init=False, default=None, repr=False)

title class-attribute instance-attribute #

title: Optional[str] = field(default=None)

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self._schema:
        params = {"response__": (self._schema, ...)}

        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(  # type: ignore[call-overload]
                "",
                __config__=get_config_base(),  # type: ignore[arg-type]
                **params,  # type: ignore[arg-type]
            ),
            params=params,
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

name #

name() -> str

Returns the name of the API operation.

Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    """Returns the name of the API operation."""
    raise NotImplementedError()

publish abstractmethod async #

publish(message: SendableMessage, correlation_id: Optional[str] = None, **kwargs: Any) -> Optional[SendableMessage]

Publish a message.

PARAMETER DESCRIPTION
message

The message to be published.

TYPE: SendableMessage

correlation_id

Optional correlation ID for the message.

TYPE: Optional[str] DEFAULT: None

**kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
Optional[SendableMessage]

The published message.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/publisher.py
@abstractmethod
async def publish(
    self,
    message: SendableMessage,
    correlation_id: Optional[str] = None,
    **kwargs: Any,
) -> Optional[SendableMessage]:
    """Publish a message.

    Args:
        message: The message to be published.
        correlation_id: Optional correlation ID for the message.
        **kwargs: Additional keyword arguments.

    Returns:
        The published message.

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    raise NotImplementedError()

reset_test #

reset_test() -> None
Source code in faststream/broker/publisher.py
def reset_test(self) -> None:
    self._fake_handler = False
    self.mock = None

schema #

schema() -> Dict[str, Channel]

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    return {}

set_test #

set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
def set_test(
    self,
    mock: MagicMock,
    with_fake: bool,
) -> None:
    self.mock = mock
    self._fake_handler = with_fake