Bases: LogicPublisher
A class to represent a NATS publisher.
calls class-attribute
instance-attribute
include_in_schema class-attribute
instance-attribute
include_in_schema: bool = field(default=True)
mock class-attribute
instance-attribute
reply_to class-attribute
instance-attribute
stream class-attribute
instance-attribute
subject class-attribute
instance-attribute
timeout class-attribute
instance-attribute
title class-attribute
instance-attribute
get_payloads
Source code in faststream/broker/publisher.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
payloads: List[Tuple[AnyDict, str]] = []
if self._schema:
call_model: CallModel[Any, Any] = CallModel(
call=lambda: None,
model=create_model("Fake"),
response_model=create_model(
"",
__config__=get_config_base(), # type: ignore[arg-type]
response__=(self._schema, ...),
),
)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body: # pragma: no branch
payloads.append((body, ""))
else:
for call in self.calls:
call_model = build_call_model(call)
body = get_response_schema(
call_model,
prefix=f"{self.name}:Message",
)
if body:
payloads.append((body, to_camelcase(unwrap(call).__name__)))
return payloads
|
publish async
Source code in faststream/nats/publisher.py
| @override
async def publish( # type: ignore[override]
self,
message: SendableMessage = "",
reply_to: str = "",
correlation_id: Optional[str] = None,
headers: Optional[Dict[str, str]] = None,
**producer_kwargs: Any,
) -> Optional[DecodedMessage]:
assert self._producer, NOT_CONNECTED_YET # nosec B101
assert self.subject, "You have to specify outgoing subject" # nosec B101
extra: AnyDict = {
"reply_to": reply_to or self.reply_to,
}
if self.stream is not None:
extra.update(
{
"stream": self.stream.name,
"timeout": self.timeout,
}
)
return await self._producer.publish(
message=message,
subject=self.subject,
headers=headers or self.headers,
correlation_id=correlation_id,
**extra,
**producer_kwargs,
)
|
reset_test
Source code in faststream/broker/publisher.py
| def reset_test(self) -> None:
self._fake_handler = False
self.mock = None
|
schema
Source code in faststream/nats/asyncapi.py
| def schema(self) -> Dict[str, Channel]:
if not self.include_in_schema:
return {}
payloads = self.get_payloads()
return {
self.name: Channel(
description=self.description,
publish=Operation(
message=Message(
title=f"{self.name}:Message",
payload=resolve_payloads(payloads, "Publisher"),
correlationId=CorrelationId(
location="$message.header#/correlation_id"
),
),
),
bindings=ChannelBinding(
nats=nats.ChannelBinding(
subject=self.subject,
)
),
)
}
|
set_test
Source code in faststream/broker/publisher.py
| def set_test(
self,
mock: MagicMock,
with_fake: bool,
) -> None:
self.mock = mock
self._fake_handler = with_fake
|