Skip to content

AsyncAPIOperation

faststream.asyncapi.abc.AsyncAPIOperation #

Bases: AsyncAPIProto

A class representing an asynchronous API operation.

title_ instance-attribute #

title_

AsyncAPI object title.

description_ instance-attribute #

description_

AsyncAPI object description.

include_in_schema instance-attribute #

include_in_schema

Whetever to include operation in AsyncAPI schema or not.

name property #

name

Returns the name of the API operation.

description property #

description

Returns the description of the API operation.

get_name abstractmethod #

get_name()

Name property fallback.

Source code in faststream/asyncapi/abc.py
@abstractmethod
def get_name(self) -> str:
    """Name property fallback."""
    raise NotImplementedError()

get_description #

get_description()

Description property fallback.

Source code in faststream/asyncapi/abc.py
def get_description(self) -> Optional[str]:
    """Description property fallback."""
    return None

schema #

schema()

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/abc.py
def schema(self) -> Dict[str, Channel]:
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    if self.include_in_schema:
        return self.get_schema()
    else:
        return {}

get_schema abstractmethod #

get_schema()

Generate AsyncAPI schema.

Source code in faststream/asyncapi/abc.py
@abstractmethod
def get_schema(self) -> Dict[str, Channel]:
    """Generate AsyncAPI schema."""
    raise NotImplementedError()

get_payloads abstractmethod #

get_payloads()

Generate AsyncAPI payloads.

Source code in faststream/asyncapi/abc.py
@abstractmethod
def get_payloads(self) -> Any:
    """Generate AsyncAPI payloads."""
    raise NotImplementedError()