Skip to content

LogicPublisher

faststream.rabbit.publisher.usecase.LogicPublisher #

LogicPublisher(*, routing_key, queue, exchange, message_kwargs, broker_middlewares, middlewares, schema_, title_, description_, include_in_schema)

Bases: PublisherUsecase[IncomingMessage], BaseRMQInformation

A class to represent a RabbitMQ publisher.

Source code in faststream/rabbit/publisher/usecase.py
def __init__(
    self,
    *,
    routing_key: str,
    queue: "RabbitQueue",
    exchange: "RabbitExchange",
    message_kwargs: "PublishKwargs",
    # Publisher args
    broker_middlewares: Iterable["BrokerMiddleware[IncomingMessage]"],
    middlewares: Iterable["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> None:
    super().__init__(
        broker_middlewares=broker_middlewares,
        middlewares=middlewares,
        # AsyncAPI args
        schema_=schema_,
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )

    self.routing_key = routing_key

    request_kwargs = dict(message_kwargs)
    self.reply_to = request_kwargs.pop("reply_to", None)
    self.message_kwargs = request_kwargs

    # BaseRMQInformation
    self.queue = queue
    self.exchange = exchange

    # Setup it later
    self.app_id = None
    self.virtual_host = ""

title_ instance-attribute #

title_ = title_

description_ instance-attribute #

description_ = description_

include_in_schema instance-attribute #

include_in_schema = include_in_schema

name property #

name

Returns the name of the API operation.

description property #

description

Returns the description of the API operation.

schema_ instance-attribute #

schema_ = schema_

mock instance-attribute #

mock = None

calls instance-attribute #

calls = []

routing_key instance-attribute #

routing_key = routing_key

reply_to instance-attribute #

reply_to = pop('reply_to', None)

message_kwargs instance-attribute #

message_kwargs = request_kwargs

queue instance-attribute #

queue = queue

exchange instance-attribute #

exchange = exchange

app_id instance-attribute #

app_id = None

virtual_host instance-attribute #

virtual_host = ''

routing property #

routing

Return real routing_key of Publisher.

schema #

schema()

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/abc.py
def schema(self) -> Dict[str, Channel]:
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    if self.include_in_schema:
        return self.get_schema()
    else:
        return {}

add_middleware #

add_middleware(middleware)
Source code in faststream/broker/publisher/usecase.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    self._broker_middlewares = (*self._broker_middlewares, middleware)

create abstractmethod staticmethod #

create()

Abstract factory to create a real Publisher.

Source code in faststream/broker/publisher/proto.py
@staticmethod
@abstractmethod
def create() -> "PublisherProto[MsgType]":
    """Abstract factory to create a real Publisher."""
    ...

get_name abstractmethod #

get_name()

Name property fallback.

Source code in faststream/asyncapi/abc.py
@abstractmethod
def get_name(self) -> str:
    """Name property fallback."""
    raise NotImplementedError()

get_description #

get_description()

Description property fallback.

Source code in faststream/asyncapi/abc.py
def get_description(self) -> Optional[str]:
    """Description property fallback."""
    return None

get_schema abstractmethod #

get_schema()

Generate AsyncAPI schema.

Source code in faststream/asyncapi/abc.py
@abstractmethod
def get_schema(self) -> Dict[str, Channel]:
    """Generate AsyncAPI schema."""
    raise NotImplementedError()

get_payloads #

get_payloads()
Source code in faststream/broker/publisher/usecase.py
def get_payloads(self) -> List[Tuple["AnyDict", str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self.schema_:
        params = {"response__": (self.schema_, ...)}

        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(  # type: ignore[call-overload]
                "",
                __config__=get_config_base(),
                **params,
            ),
            params=params,
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/broker/publisher/usecase.py
def set_test(
    self,
    *,
    mock: Annotated[
        MagicMock,
        Doc("Mock object to check in tests."),
    ],
    with_fake: Annotated[
        bool,
        Doc("Whetevet publisher's fake subscriber created or not."),
    ],
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/broker/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock = None

setup #

setup(*, producer, app_id, virtual_host)
Source code in faststream/rabbit/publisher/usecase.py
@override
def setup(  # type: ignore[override]
    self,
    *,
    producer: Optional["AioPikaFastProducer"],
    app_id: Optional[str],
    virtual_host: str,
) -> None:
    self.app_id = app_id
    self.virtual_host = virtual_host
    super().setup(producer=producer)

publish async #

publish(message, queue=None, exchange=None, *, routing_key='', correlation_id=None, message_id=None, timestamp=None, rpc=False, rpc_timeout=30.0, raise_timeout=False, _extra_middlewares=(), **publish_kwargs)
Source code in faststream/rabbit/publisher/usecase.py
@override
async def publish(
    self,
    message: "AioPikaSendableMessage",
    queue: Annotated[
        Union["RabbitQueue", str, None],
        Doc("Message routing key to publish with."),
    ] = None,
    exchange: Annotated[
        Union["RabbitExchange", str, None],
        Doc("Target exchange to publish message to."),
    ] = None,
    *,
    routing_key: Annotated[
        str,
        Doc(
            "Message routing key to publish with. "
            "Overrides `queue` option if presented."
        ),
    ] = "",
    # message args
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    message_id: Annotated[
        Optional[str],
        Doc("Arbitrary message id. Generated automatically if not presented."),
    ] = None,
    timestamp: Annotated[
        Optional["DateType"],
        Doc("Message publish timestamp. Generated automatically if not presented."),
    ] = None,
    # rpc args
    rpc: Annotated[
        bool,
        Doc("Whether to wait for reply in blocking mode."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    rpc_timeout: Annotated[
        Optional[float],
        Doc("RPC reply waiting time."),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "Please, use `request` method with `timeout` instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = 30.0,
    raise_timeout: Annotated[
        bool,
        Doc(
            "Whetever to raise `TimeoutError` or return `None` at **rpc_timeout**. "
            "RPC request returns `None` at timeout by default."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.17**. "
            "`request` always raises TimeoutError instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = False,
    # publisher specific
    _extra_middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Extra middlewares to wrap publishing process."),
    ] = (),
    **publish_kwargs: "Unpack[PublishKwargs]",
) -> Optional[Any]:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    kwargs: AnyDict = {
        "routing_key": routing_key
        or self.routing_key
        or RabbitQueue.validate(queue or self.queue).routing,
        "exchange": exchange or self.exchange.name,
        "app_id": self.app_id,
        "correlation_id": correlation_id or gen_cor_id(),
        "message_id": message_id,
        "timestamp": timestamp,
        # specific args
        "rpc": rpc,
        "rpc_timeout": rpc_timeout,
        "raise_timeout": raise_timeout,
        "reply_to": self.reply_to,
        **self.message_kwargs,
        **publish_kwargs,
    }

    call: AsyncFunc = self._producer.publish

    for m in chain(
        (
            _extra_middlewares
            or (m(None).publish_scope for m in self._broker_middlewares)
        ),
        self._middlewares,
    ):
        call = partial(m, call)

    return await call(message, **kwargs)

request async #

request(message, queue=None, exchange=None, *, routing_key='', correlation_id=None, message_id=None, timestamp=None, _extra_middlewares=(), **publish_kwargs)
Source code in faststream/rabbit/publisher/usecase.py
@override
async def request(
    self,
    message: "AioPikaSendableMessage",
    queue: Annotated[
        Union["RabbitQueue", str, None],
        Doc("Message routing key to publish with."),
    ] = None,
    exchange: Annotated[
        Union["RabbitExchange", str, None],
        Doc("Target exchange to publish message to."),
    ] = None,
    *,
    routing_key: Annotated[
        str,
        Doc(
            "Message routing key to publish with. "
            "Overrides `queue` option if presented."
        ),
    ] = "",
    # message args
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    message_id: Annotated[
        Optional[str],
        Doc("Arbitrary message id. Generated automatically if not presented."),
    ] = None,
    timestamp: Annotated[
        Optional["DateType"],
        Doc("Message publish timestamp. Generated automatically if not presented."),
    ] = None,
    # publisher specific
    _extra_middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Extra middlewares to wrap publishing process."),
    ] = (),
    **publish_kwargs: "Unpack[RequestPublishKwargs]",
) -> "RabbitMessage":
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    kwargs: AnyDict = {
        "routing_key": routing_key
        or self.routing_key
        or RabbitQueue.validate(queue or self.queue).routing,
        "exchange": exchange or self.exchange.name,
        "app_id": self.app_id,
        "correlation_id": correlation_id or gen_cor_id(),
        "message_id": message_id,
        "timestamp": timestamp,
        # specific args
        **self.message_kwargs,
        **publish_kwargs,
    }

    request: AsyncFunc = self._producer.request

    for pub_m in chain(
        (
            _extra_middlewares
            or (m(None).publish_scope for m in self._broker_middlewares)
        ),
        self._middlewares,
    ):
        request = partial(pub_m, request)

    published_msg = await request(
        message,
        **kwargs,
    )

    async with AsyncExitStack() as stack:
        return_msg: Callable[[RabbitMessage], Awaitable[RabbitMessage]] = (
            return_input
        )
        for m in self._broker_middlewares:
            mid = m(published_msg)
            await stack.enter_async_context(mid)
            return_msg = partial(mid.consume_scope, return_msg)

        parsed_msg = await self._producer._parser(published_msg)
        parsed_msg._decoded_body = await self._producer._decoder(parsed_msg)
        parsed_msg._source_type = SourceType.Response
        return await return_msg(parsed_msg)

    raise AssertionError("unreachable")

add_prefix #

add_prefix(prefix)

Include Publisher in router.

Source code in faststream/rabbit/publisher/usecase.py
def add_prefix(self, prefix: str) -> None:
    """Include Publisher in router."""
    new_q = deepcopy(self.queue)
    new_q.name = prefix + new_q.name
    self.queue = new_q