Skip to content

BaseHandler

faststream.broker.handler.BaseHandler #

BaseHandler(
    *,
    log_context_builder: Callable[
        [StreamMessage[Any]], Dict[str, str]
    ],
    description: Optional[str] = None,
    title: Optional[str] = None,
    include_in_schema: bool = True
)

Bases: AsyncAPIOperation, Generic[MsgType]

A base handler class for asynchronous API operations.

METHOD DESCRIPTION
__init__

Initializes the BaseHandler object.

name

Returns the name of the handler.

call_name

Returns the name of the handler call.

description

Returns the description of the handler.

consume

Abstract method to consume a message.

Note: This class inherits from AsyncAPIOperation and is a generic class with type parameter MsgType.

Initialize a new instance of the class.

PARAMETER DESCRIPTION
log_context_builder

A callable that builds the log context.

TYPE: Callable[[StreamMessage[Any]], Dict[str, str]]

description

Optional description of the instance.

TYPE: Optional[str] DEFAULT: None

title

Optional title of the instance.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Whether to include the instance in the schema.

TYPE: bool DEFAULT: True

Source code in faststream/broker/handler.py
def __init__(
    self,
    *,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    description: Optional[str] = None,
    title: Optional[str] = None,
    include_in_schema: bool = True,
) -> None:
    """Initialize a new instance of the class.

    Args:
        log_context_builder: A callable that builds the log context.
        description: Optional description of the instance.
        title: Optional title of the instance.
        include_in_schema: Whether to include the instance in the schema.

    """
    self.calls = []  # type: ignore[assignment]
    self.global_middlewares = []

    self.log_context_builder = log_context_builder
    self.running = False

    # AsyncAPI information
    self._description = description
    self._title = title
    self.include_in_schema = include_in_schema

call_name property #

call_name: str

Returns the name of the handler call.

calls instance-attribute #

calls: Union[
    List[
        Tuple[
            HandlerCallWrapper[
                MsgType, Any, SendableMessage
            ],
            Callable[[StreamMessage[MsgType]], bool],
            SyncParser[MsgType, StreamMessage[MsgType]],
            SyncDecoder[StreamMessage[MsgType]],
            Sequence[Callable[[Any], BaseMiddleware]],
            CallModel[Any, SendableMessage],
        ]
    ],
    List[
        Tuple[
            HandlerCallWrapper[
                MsgType, Any, SendableMessage
            ],
            Callable[
                [StreamMessage[MsgType]], Awaitable[bool]
            ],
            AsyncParser[MsgType, StreamMessage[MsgType]],
            AsyncDecoder[StreamMessage[MsgType]],
            Sequence[Callable[[Any], BaseMiddleware]],
            CallModel[Any, SendableMessage],
        ]
    ],
] = []

description property #

description: Optional[str]

Returns the description of the handler.

global_middlewares instance-attribute #

global_middlewares: Sequence[
    Callable[[Any], BaseMiddleware]
] = []

include_in_schema instance-attribute #

include_in_schema = include_in_schema

log_context_builder instance-attribute #

log_context_builder = log_context_builder

running instance-attribute #

running = False

consume abstractmethod #

consume(msg: MsgType) -> SendableMessage

Consume a message.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/handler.py
@abstractmethod
def consume(self, msg: MsgType) -> SendableMessage:
    """Consume a message.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    raise NotImplementedError()

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]

Get the payloads of the handler.

Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    """Get the payloads of the handler."""
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep, prefix=f"{self._title or self.call_name}:Message"
        )
        payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))

    return payloads

name #

name() -> str

Returns the name of the API operation.

Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    """Returns the name of the API operation."""
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    return {}