Bases: BasePublisher[Msg]
calls class-attribute
instance-attribute
calls: List[Callable[..., Any]] = field(
init=False, default_factory=list, repr=False
)
description property
description: Optional[str]
headers: Optional[Dict[str, str]] = field(default=None)
mock class-attribute
instance-attribute
mock: Optional[MagicMock] = field(
init=False, default=None, repr=False
)
reply_to class-attribute
instance-attribute
reply_to: str = field(default='')
stream class-attribute
instance-attribute
stream: Optional[JStream] = field(default=None)
subject class-attribute
instance-attribute
subject: str = field(default='')
timeout class-attribute
instance-attribute
timeout: Optional[float] = field(default=None)
title class-attribute
instance-attribute
title: Optional[str] = field(default=None)
get_payloads
get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/publisher.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
payloads: List[Tuple[AnyDict, str]] = []
if self._schema:
call_model: CallModel[Any, Any] = CallModel(
call=lambda: None,
model=create_model("Fake"),
response_model=create_model(
"",
__base__=(CreateBaseModel,), # type: ignore[arg-type]
response__=(self._schema, ...),
),
)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body: # pragma: no branch
payloads.append((body, ""))
else:
for call in self.calls:
call_model = build_call_model(call)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body:
payloads.append((body, to_camelcase(unwrap(call).__name__)))
return payloads
|
name
Source code in faststream/asyncapi/base.py
| @abstractproperty
def name(self) -> str:
raise NotImplementedError()
|
publish async
publish(
message: SendableMessage = "",
reply_to: str = "",
correlation_id: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
**producer_kwargs: Any
) -> Optional[DecodedMessage]
Source code in faststream/nats/publisher.py
| @override
async def publish( # type: ignore[override]
self,
message: SendableMessage = "",
reply_to: str = "",
correlation_id: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
**producer_kwargs: Any,
) -> Optional[DecodedMessage]:
assert self._producer, "Please, setup `_producer` first" # nosec B101
assert self.subject, "You have to specify outcome subject" # nosec B101
extra: AnyDict = {
"reply_to": reply_to or self.reply_to,
}
if self.stream is not None:
extra.update(
{
"stream": self.stream.name,
"timeout": self.timeout,
}
)
return await self._producer.publish(
message=message,
subject=self.subject,
headers=headers or self.headers,
correlation_id=correlation_id,
**extra,
**producer_kwargs,
)
|
reset_test
Source code in faststream/broker/publisher.py
| def reset_test(self) -> None:
self._fake_handler = False
self.mock = None
|
schema
schema() -> Dict[str, Channel]
Source code in faststream/asyncapi/base.py
| def schema(self) -> Dict[str, Channel]: # pragma: no cover
return {}
|
set_test
set_test(mock: MagicMock, with_fake: bool) -> None
Source code in faststream/broker/publisher.py
| def set_test(
self,
mock: MagicMock,
with_fake: bool,
) -> None:
self.mock = mock
self._fake_handler = with_fake
|