FakePublisher(method, *, publish_kwargs, middlewares=())
Bases: BasePublisherProto
Publisher Interface implementation to use as RPC or REPLY TO publisher.
Initialize an object.
Source code in faststream/broker/publisher/fake.py
| def __init__(
self,
method: "AsyncFunc",
*,
publish_kwargs: "AnyDict",
middlewares: Sequence["PublisherMiddleware"] = (),
) -> None:
"""Initialize an object."""
self.method = method
self.publish_kwargs = publish_kwargs
self.middlewares = middlewares
|
method instance-attribute
publish_kwargs instance-attribute
middlewares instance-attribute
publish async
publish(message, *, correlation_id=None, _extra_middlewares=(), **kwargs)
Publish a message.
Source code in faststream/broker/publisher/fake.py
| async def publish(
self,
message: "SendableMessage",
*,
correlation_id: Optional[str] = None,
_extra_middlewares: Sequence["PublisherMiddleware"] = (),
**kwargs: Any,
) -> Any:
"""Publish a message."""
publish_kwargs = {
"correlation_id": correlation_id,
**self.publish_kwargs,
**kwargs,
}
call: AsyncFunc = self.method
for m in chain(_extra_middlewares, self.middlewares):
call = partial(m, call)
return await call(message, **publish_kwargs)
|
request async
request(message, /, *, correlation_id=None, _extra_middlewares=())
Source code in faststream/broker/publisher/fake.py
| async def request(
self,
message: "SendableMessage",
/,
*,
correlation_id: Optional[str] = None,
_extra_middlewares: Sequence["PublisherMiddleware"] = (),
) -> Any:
raise NotImplementedError(
"`FakePublisher` can be used only to publish "
"a response for `reply-to` or `RPC` messages."
)
|