LogicPublisher(
*,
reply_to,
headers,
broker_middlewares,
middlewares,
schema_,
title_,
description_,
include_in_schema,
)
Bases: PublisherUsecase[UnifyRedisDict]
A class to represent a Redis publisher.
Source code in faststream/redis/publisher/usecase.py
| def __init__(
self,
*,
reply_to: str,
headers: Optional["AnyDict"],
# Publisher args
broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"],
middlewares: Sequence["PublisherMiddleware"],
# AsyncAPI args
schema_: Optional[Any],
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
) -> None:
super().__init__(
broker_middlewares=broker_middlewares,
middlewares=middlewares,
# AsyncAPI args
schema_=schema_,
title_=title_,
description_=description_,
include_in_schema=include_in_schema,
)
self.reply_to = reply_to
self.headers = headers
self._producer = None
|
title_ instance-attribute
description_ instance-attribute
description_ = description_
include_in_schema instance-attribute
include_in_schema = include_in_schema
name property
Returns the name of the API operation.
description property
Returns the description of the API operation.
schema_ instance-attribute
reply_to instance-attribute
publish abstractmethod
async
publish(
message,
/,
*,
correlation_id=None,
_extra_middlewares=(),
)
Publishes a message asynchronously.
Source code in faststream/broker/publisher/proto.py
| @abstractmethod
async def publish(
self,
message: "SendableMessage",
/,
*,
correlation_id: Optional[str] = None,
_extra_middlewares: Sequence["PublisherMiddleware"] = (),
) -> Optional[Any]:
"""Publishes a message asynchronously."""
...
|
request abstractmethod
async
request(
message,
/,
*,
correlation_id=None,
_extra_middlewares=(),
)
Publishes a message synchronously.
Source code in faststream/broker/publisher/proto.py
| @abstractmethod
async def request(
self,
message: "SendableMessage",
/,
*,
correlation_id: Optional[str] = None,
_extra_middlewares: Sequence["PublisherMiddleware"] = (),
) -> Optional[Any]:
"""Publishes a message synchronously."""
...
|
setup
Source code in faststream/broker/publisher/usecase.py
| @override
def setup( # type: ignore[override]
self,
*,
producer: Optional["ProducerProto"],
) -> None:
self._producer = producer
|
add_prefix abstractmethod
Source code in faststream/broker/proto.py
| @abstractmethod
def add_prefix(self, prefix: str) -> None: ...
|
schema
Returns the schema of the API operation as a dictionary of channel names and channel objects.
Source code in faststream/asyncapi/abc.py
| def schema(self) -> Dict[str, Channel]:
"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""
if self.include_in_schema:
return self.get_schema()
else:
return {}
|
add_middleware
add_middleware(middleware)
Source code in faststream/broker/publisher/usecase.py
| def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)
|
create abstractmethod
staticmethod
Abstract factory to create a real Publisher.
Source code in faststream/broker/publisher/proto.py
| @staticmethod
@abstractmethod
def create() -> "PublisherProto[MsgType]":
"""Abstract factory to create a real Publisher."""
...
|
get_name abstractmethod
Name property fallback.
Source code in faststream/asyncapi/abc.py
| @abstractmethod
def get_name(self) -> str:
"""Name property fallback."""
raise NotImplementedError()
|
get_description
Description property fallback.
Source code in faststream/asyncapi/abc.py
| def get_description(self) -> Optional[str]:
"""Description property fallback."""
return None
|
get_schema abstractmethod
Generate AsyncAPI schema.
Source code in faststream/asyncapi/abc.py
| @abstractmethod
def get_schema(self) -> Dict[str, Channel]:
"""Generate AsyncAPI schema."""
raise NotImplementedError()
|
get_payloads
Source code in faststream/broker/publisher/usecase.py
| def get_payloads(self) -> List[Tuple["AnyDict", str]]:
payloads: List[Tuple[AnyDict, str]] = []
if self.schema_:
params = {"response__": (self.schema_, ...)}
call_model: CallModel[Any, Any] = CallModel(
call=lambda: None,
model=create_model("Fake"),
response_model=create_model( # type: ignore[call-overload]
"",
__config__=get_config_base(),
**params,
),
params=params,
)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body: # pragma: no branch
payloads.append((body, ""))
else:
for call in self.calls:
call_model = build_call_model(call)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body:
payloads.append((body, to_camelcase(unwrap(call).__name__)))
return payloads
|
set_test
set_test(*, mock, with_fake)
Turn publisher to testing mode.
Source code in faststream/broker/publisher/usecase.py
| def set_test(
self,
*,
mock: Annotated[
MagicMock,
Doc("Mock object to check in tests."),
],
with_fake: Annotated[
bool,
Doc("Whetevet publisher's fake subscriber created or not."),
],
) -> None:
"""Turn publisher to testing mode."""
self.mock = mock
self._fake_handler = with_fake
|
reset_test
Turn off publisher's testing mode.
Source code in faststream/broker/publisher/usecase.py
| def reset_test(self) -> None:
"""Turn off publisher's testing mode."""
self._fake_handler = False
self.mock = None
|
subscriber_property abstractmethod
subscriber_property(*, name_only)
Source code in faststream/redis/publisher/usecase.py
| @abstractmethod
def subscriber_property(self, *, name_only: bool) -> "AnyDict":
raise NotImplementedError()
|