PublisherUsecase(*, broker_middlewares, middlewares, schema_, title_, description_, include_in_schema)
Bases: ABC
, AsyncAPIOperation
, PublisherProto[MsgType]
A base class for publishers in an asynchronous API.
Source code in faststream/broker/publisher/usecase.py
| def __init__(
self,
*,
broker_middlewares: Annotated[
Iterable["BrokerMiddleware[MsgType]"],
Doc("Top-level middlewares to use in direct `.publish` call."),
],
middlewares: Annotated[
Iterable["PublisherMiddleware"],
Doc("Publisher middlewares."),
],
# AsyncAPI args
schema_: Annotated[
Optional[Any],
Doc(
"AsyncAPI publishing message type"
"Should be any python-native object annotation or `pydantic.BaseModel`."
),
],
title_: Annotated[
Optional[str],
Doc("AsyncAPI object title."),
],
description_: Annotated[
Optional[str],
Doc("AsyncAPI object description."),
],
include_in_schema: Annotated[
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
],
) -> None:
self.calls = []
self._middlewares = middlewares
self._broker_middlewares = broker_middlewares
self._producer = None
self._fake_handler = False
self.mock = None
# AsyncAPI
self.title_ = title_
self.description_ = description_
self.include_in_schema = include_in_schema
self.schema_ = schema_
|
name property
Returns the name of the API operation.
description property
Returns the description of the API operation.
title_ instance-attribute
description_ instance-attribute
include_in_schema instance-attribute
schema_ instance-attribute
publish abstractmethod
async
publish(message, /, *, correlation_id=None, _extra_middlewares=())
Publishes a message asynchronously.
Source code in faststream/broker/publisher/proto.py
| @abstractmethod
async def publish(
self,
message: "SendableMessage",
/,
*,
correlation_id: Optional[str] = None,
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
"""Publishes a message asynchronously."""
...
|
request abstractmethod
async
request(message, /, *, correlation_id=None, _extra_middlewares=())
Publishes a message synchronously.
Source code in faststream/broker/publisher/proto.py
| @abstractmethod
async def request(
self,
message: "SendableMessage",
/,
*,
correlation_id: Optional[str] = None,
_extra_middlewares: Iterable["PublisherMiddleware"] = (),
) -> Optional[Any]:
"""Publishes a message synchronously."""
...
|
add_prefix abstractmethod
Source code in faststream/broker/proto.py
| @abstractmethod
def add_prefix(self, prefix: str) -> None: ...
|
schema
Returns the schema of the API operation as a dictionary of channel names and channel objects.
Source code in faststream/asyncapi/abc.py
| def schema(self) -> Dict[str, Channel]:
"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""
if self.include_in_schema:
return self.get_schema()
else:
return {}
|
create abstractmethod
staticmethod
Abstract factory to create a real Publisher.
Source code in faststream/broker/publisher/proto.py
| @staticmethod
@abstractmethod
def create() -> "PublisherProto[MsgType]":
"""Abstract factory to create a real Publisher."""
...
|
get_name abstractmethod
Name property fallback.
Source code in faststream/asyncapi/abc.py
| @abstractmethod
def get_name(self) -> str:
"""Name property fallback."""
raise NotImplementedError()
|
get_description
Description property fallback.
Source code in faststream/asyncapi/abc.py
| def get_description(self) -> Optional[str]:
"""Description property fallback."""
return None
|
get_schema abstractmethod
Generate AsyncAPI schema.
Source code in faststream/asyncapi/abc.py
| @abstractmethod
def get_schema(self) -> Dict[str, Channel]:
"""Generate AsyncAPI schema."""
raise NotImplementedError()
|
add_middleware
add_middleware(middleware)
Source code in faststream/broker/publisher/usecase.py
| def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)
|
setup
Source code in faststream/broker/publisher/usecase.py
| @override
def setup( # type: ignore[override]
self,
*,
producer: Optional["ProducerProto"],
) -> None:
self._producer = producer
|
set_test
set_test(*, mock, with_fake)
Turn publisher to testing mode.
Source code in faststream/broker/publisher/usecase.py
| def set_test(
self,
*,
mock: Annotated[
MagicMock,
Doc("Mock object to check in tests."),
],
with_fake: Annotated[
bool,
Doc("Whetevet publisher's fake subscriber created or not."),
],
) -> None:
"""Turn publisher to testing mode."""
self.mock = mock
self._fake_handler = with_fake
|
reset_test
Turn off publisher's testing mode.
Source code in faststream/broker/publisher/usecase.py
| def reset_test(self) -> None:
"""Turn off publisher's testing mode."""
self._fake_handler = False
self.mock = None
|
get_payloads
Source code in faststream/broker/publisher/usecase.py
| def get_payloads(self) -> List[Tuple["AnyDict", str]]:
payloads: List[Tuple[AnyDict, str]] = []
if self.schema_:
params = {"response__": (self.schema_, ...)}
call_model: CallModel[Any, Any] = CallModel(
call=lambda: None,
model=create_model("Fake"),
response_model=create_model( # type: ignore[call-overload]
"",
__config__=get_config_base(),
**params,
),
params=params,
)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body: # pragma: no branch
payloads.append((body, ""))
else:
for call in self.calls:
call_model = build_call_model(call)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body:
payloads.append((body, to_camelcase(unwrap(call).__name__)))
return payloads
|