Skip to content

FakePublisher

faststream.broker.publisher.fake.FakePublisher #

FakePublisher(method, *, publish_kwargs, middlewares=())

Bases: BasePublisherProto

Publisher Interface implementation to use as RPC or REPLY TO publisher.

Initialize an object.

Source code in faststream/broker/publisher/fake.py
def __init__(
    self,
    method: "AsyncFunc",
    *,
    publish_kwargs: "AnyDict",
    middlewares: Sequence["PublisherMiddleware"] = (),
) -> None:
    """Initialize an object."""
    self.method = method
    self.publish_kwargs = publish_kwargs
    self.middlewares = middlewares

method instance-attribute #

method = method

publish_kwargs instance-attribute #

publish_kwargs = publish_kwargs

middlewares instance-attribute #

middlewares = middlewares

publish async #

publish(message, *, correlation_id=None, _extra_middlewares=(), **kwargs)

Publish a message.

Source code in faststream/broker/publisher/fake.py
async def publish(
    self,
    message: "SendableMessage",
    *,
    correlation_id: Optional[str] = None,
    _extra_middlewares: Sequence["PublisherMiddleware"] = (),
    **kwargs: Any,
) -> Any:
    """Publish a message."""
    publish_kwargs = {
        "correlation_id": correlation_id,
        **self.publish_kwargs,
        **kwargs,
    }

    call: AsyncFunc = self.method
    for m in chain(_extra_middlewares, self.middlewares):
        call = partial(m, call)

    return await call(message, **publish_kwargs)

request async #

request(message, /, *, correlation_id=None, _extra_middlewares=())
Source code in faststream/broker/publisher/fake.py
async def request(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: Optional[str] = None,
    _extra_middlewares: Sequence["PublisherMiddleware"] = (),
) -> Any:
    raise NotImplementedError(
        "`FakePublisher` can be used only to publish "
        "a response for `reply-to` or `RPC` messages."
    )