Skip to content

AsyncAPIPublisher

faststream.redis.publisher.asyncapi.AsyncAPIPublisher #

AsyncAPIPublisher(*, reply_to, headers, broker_middlewares, middlewares, schema_, title_, description_, include_in_schema)

Bases: LogicPublisher, RedisAsyncAPIProtocol

A class to represent a Redis publisher.

Source code in faststream/redis/publisher/usecase.py
def __init__(
    self,
    *,
    reply_to: str,
    headers: Optional["AnyDict"],
    # Publisher args
    broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"],
    middlewares: Sequence["PublisherMiddleware"],
    # AsyncAPI args
    schema_: Optional[Any],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> None:
    super().__init__(
        broker_middlewares=broker_middlewares,
        middlewares=middlewares,
        # AsyncAPI args
        schema_=schema_,
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )

    self.reply_to = reply_to
    self.headers = headers

    self._producer = None

title_ instance-attribute #

title_ = title_

description_ instance-attribute #

description_ = description_

include_in_schema instance-attribute #

include_in_schema = include_in_schema

name property #

name

Returns the name of the API operation.

description property #

description

Returns the description of the API operation.

schema_ instance-attribute #

schema_ = schema_

channel_binding abstractmethod property #

channel_binding

mock instance-attribute #

mock = None

calls instance-attribute #

calls = []

reply_to instance-attribute #

reply_to = reply_to

headers instance-attribute #

headers = headers

publish abstractmethod async #

publish(message, /, *, correlation_id=None, _extra_middlewares=())

Publishes a message asynchronously.

Source code in faststream/broker/publisher/proto.py
@abstractmethod
async def publish(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: Optional[str] = None,
    _extra_middlewares: Sequence["PublisherMiddleware"] = (),
) -> Optional[Any]:
    """Publishes a message asynchronously."""
    ...

request abstractmethod async #

request(message, /, *, correlation_id=None, _extra_middlewares=())

Publishes a message synchronously.

Source code in faststream/broker/publisher/proto.py
@abstractmethod
async def request(
    self,
    message: "SendableMessage",
    /,
    *,
    correlation_id: Optional[str] = None,
    _extra_middlewares: Sequence["PublisherMiddleware"] = (),
) -> Optional[Any]:
    """Publishes a message synchronously."""
    ...

setup #

setup(*, producer)
Source code in faststream/broker/publisher/usecase.py
@override
def setup(  # type: ignore[override]
    self,
    *,
    producer: Optional["ProducerProto"],
) -> None:
    self._producer = producer

add_prefix abstractmethod #

add_prefix(prefix)
Source code in faststream/broker/proto.py
@abstractmethod
def add_prefix(self, prefix: str) -> None: ...

schema #

schema()

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/abc.py
def schema(self) -> Dict[str, Channel]:
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    if self.include_in_schema:
        return self.get_schema()
    else:
        return {}

add_middleware #

add_middleware(middleware)
Source code in faststream/broker/publisher/usecase.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    self._broker_middlewares = (*self._broker_middlewares, middleware)

get_name abstractmethod #

get_name()

Name property fallback.

Source code in faststream/asyncapi/abc.py
@abstractmethod
def get_name(self) -> str:
    """Name property fallback."""
    raise NotImplementedError()

get_description #

get_description()

Description property fallback.

Source code in faststream/asyncapi/abc.py
def get_description(self) -> Optional[str]:
    """Description property fallback."""
    return None

get_payloads #

get_payloads()
Source code in faststream/broker/publisher/usecase.py
def get_payloads(self) -> List[Tuple["AnyDict", str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    if self.schema_:
        params = {"response__": (self.schema_, ...)}

        call_model: CallModel[Any, Any] = CallModel(
            call=lambda: None,
            model=create_model("Fake"),
            response_model=create_model(  # type: ignore[call-overload]
                "",
                __config__=get_config_base(),
                **params,
            ),
            params=params,
        )

        body = get_response_schema(
            call_model,
            prefix=f"{self.name}:Message",
        )
        if body:  # pragma: no branch
            payloads.append((body, ""))

    else:
        for call in self.calls:
            call_model = build_call_model(call)
            body = get_response_schema(
                call_model,
                prefix=f"{self.name}:Message",
            )
            if body:
                payloads.append((body, to_camelcase(unwrap(call).__name__)))

    return payloads

set_test #

set_test(*, mock, with_fake)

Turn publisher to testing mode.

Source code in faststream/broker/publisher/usecase.py
def set_test(
    self,
    *,
    mock: Annotated[
        MagicMock,
        Doc("Mock object to check in tests."),
    ],
    with_fake: Annotated[
        bool,
        Doc("Whetevet publisher's fake subscriber created or not."),
    ],
) -> None:
    """Turn publisher to testing mode."""
    self.mock = mock
    self._fake_handler = with_fake

reset_test #

reset_test()

Turn off publisher's testing mode.

Source code in faststream/broker/publisher/usecase.py
def reset_test(self) -> None:
    """Turn off publisher's testing mode."""
    self._fake_handler = False
    self.mock = None

subscriber_property abstractmethod #

subscriber_property(*, name_only)
Source code in faststream/redis/publisher/usecase.py
@abstractmethod
def subscriber_property(self, *, name_only: bool) -> "AnyDict":
    raise NotImplementedError()

get_schema #

get_schema()
Source code in faststream/redis/publisher/asyncapi.py
def get_schema(self) -> Dict[str, Channel]:
    payloads = self.get_payloads()

    return {
        self.name: Channel(
            description=self.description,
            publish=Operation(
                message=Message(
                    title=f"{self.name}:Message",
                    payload=resolve_payloads(payloads, "Publisher"),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                redis=self.channel_binding,
            ),
        )
    }

create staticmethod #

create(*, channel, list, stream, headers, reply_to, broker_middlewares, middlewares, title_, description_, schema_, include_in_schema)
Source code in faststream/redis/publisher/asyncapi.py
@override
@staticmethod
def create(  # type: ignore[override]
    *,
    channel: Union["PubSub", str, None],
    list: Union["ListSub", str, None],
    stream: Union["StreamSub", str, None],
    headers: Optional["AnyDict"],
    reply_to: str,
    broker_middlewares: Sequence["BrokerMiddleware[UnifyRedisDict]"],
    middlewares: Sequence["PublisherMiddleware"],
    # AsyncAPI args
    title_: Optional[str],
    description_: Optional[str],
    schema_: Optional[Any],
    include_in_schema: bool,
) -> PublisherType:
    validate_options(channel=channel, list=list, stream=stream)

    if (channel := PubSub.validate(channel)) is not None:
        return AsyncAPIChannelPublisher(
            channel=channel,
            # basic args
            headers=headers,
            reply_to=reply_to,
            broker_middlewares=broker_middlewares,
            middlewares=middlewares,
            # AsyncAPI args
            title_=title_,
            description_=description_,
            schema_=schema_,
            include_in_schema=include_in_schema,
        )

    elif (stream := StreamSub.validate(stream)) is not None:
        return AsyncAPIStreamPublisher(
            stream=stream,
            # basic args
            headers=headers,
            reply_to=reply_to,
            broker_middlewares=broker_middlewares,
            middlewares=middlewares,
            # AsyncAPI args
            title_=title_,
            description_=description_,
            schema_=schema_,
            include_in_schema=include_in_schema,
        )

    elif (list := ListSub.validate(list)) is not None:
        if list.batch:
            return AsyncAPIListBatchPublisher(
                list=list,
                # basic args
                headers=headers,
                reply_to=reply_to,
                broker_middlewares=broker_middlewares,
                middlewares=middlewares,
                # AsyncAPI args
                title_=title_,
                description_=description_,
                schema_=schema_,
                include_in_schema=include_in_schema,
            )
        else:
            return AsyncAPIListPublisher(
                list=list,
                # basic args
                headers=headers,
                reply_to=reply_to,
                broker_middlewares=broker_middlewares,
                middlewares=middlewares,
                # AsyncAPI args
                title_=title_,
                description_=description_,
                schema_=schema_,
                include_in_schema=include_in_schema,
            )

    else:
        raise SetupError(INCORRECT_SETUP_MSG)