Skip to content

PublisherProto

faststream.broker.publisher.proto.PublisherProto #

Bases: AsyncAPIProto, EndpointProto, BasePublisherProto, Generic[MsgType]

title_ instance-attribute #

title_

AsyncAPI object title.

description_ instance-attribute #

description_

AsyncAPI object description.

include_in_schema instance-attribute #

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

name abstractmethod property #

name

Returns the name of the API operation.

description abstractmethod property #

description

Returns the description of the API operation.

schema_ instance-attribute #

schema_

publish abstractmethod async #

publish(message, /, *, correlation_id=None, _extra_middlewares=())

Publishes a message asynchronously.

Source code in faststream/broker/publisher/proto.py
@abstractmethod
async def publish(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: Optional[str] = None,
    _extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
    """Publishes a message asynchronously."""
    ...

request abstractmethod async #

request(message, /, *, correlation_id=None, _extra_middlewares=())

Publishes a message synchronously.

Source code in faststream/broker/publisher/proto.py
@abstractmethod
async def request(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: Optional[str] = None,
    _extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
    """Publishes a message synchronously."""
    ...

add_prefix abstractmethod #

add_prefix(prefix)
Source code in faststream/broker/proto.py
@abstractmethod
def add_prefix(self, prefix: str) -> None: ...

schema abstractmethod #

schema()

Generate AsyncAPI schema.

Source code in faststream/asyncapi/proto.py
@abstractmethod
def schema(self) -> Dict[str, "Channel"]:
    """Generate AsyncAPI schema."""
    ...

add_middleware abstractmethod #

add_middleware(middleware)
Source code in faststream/broker/publisher/proto.py
@abstractmethod
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None: ...

create abstractmethod staticmethod #

create()

Abstract factory to create a real Publisher.

Source code in faststream/broker/publisher/proto.py
@staticmethod
@abstractmethod
def create() -> "PublisherProto[MsgType]":
    """Abstract factory to create a real Publisher."""
    ...

setup abstractmethod #

setup(*, producer)
Source code in faststream/broker/publisher/proto.py
@override
@abstractmethod
def setup(  # type: ignore[override]
    self,
    *,
    producer: Optional["ProducerProto"],
) -> None: ...