Skip to content

BaseHandler

faststream.broker.handler.BaseHandler #

BaseHandler(*, log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]], description: Optional[str] = None, title: Optional[str] = None, include_in_schema: bool = True)

Bases: AsyncAPIOperation, Generic[MsgType]

A base handler class for asynchronous API operations.

METHOD DESCRIPTION
__init__

Initializes the BaseHandler object.

name

Returns the name of the handler.

call_name

Returns the name of the handler call.

description

Returns the description of the handler.

consume

Abstract method to consume a message.

Note: This class inherits from AsyncAPIOperation and is a generic class with type parameter MsgType.

Initialize a new instance of the class.

PARAMETER DESCRIPTION
log_context_builder

A callable that builds the log context.

TYPE: Callable[[StreamMessage[Any]], Dict[str, str]]

description

Optional description of the instance.

TYPE: Optional[str] DEFAULT: None

title

Optional title of the instance.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Whether to include the instance in the schema.

TYPE: bool DEFAULT: True

Source code in faststream/broker/handler.py
def __init__(
    self,
    *,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    description: Optional[str] = None,
    title: Optional[str] = None,
    include_in_schema: bool = True,
) -> None:
    """Initialize a new instance of the class.

    Args:
        log_context_builder: A callable that builds the log context.
        description: Optional description of the instance.
        title: Optional title of the instance.
        include_in_schema: Whether to include the instance in the schema.

    """
    self.calls = []  # type: ignore[assignment]
    self.global_middlewares = []

    self.log_context_builder = log_context_builder
    self.running = False

    # AsyncAPI information
    self._description = description
    self._title = title
    self.include_in_schema = include_in_schema

call_name property #

call_name: str

Returns the name of the handler call.

calls instance-attribute #

calls: Union[List[Tuple[HandlerCallWrapper[MsgType, Any, SendableMessage], Callable[[StreamMessage[MsgType]], bool], SyncParser[MsgType, StreamMessage[MsgType]], SyncDecoder[StreamMessage[MsgType]], Sequence[Callable[[Any], BaseMiddleware]], CallModel[Any, SendableMessage]]], List[Tuple[HandlerCallWrapper[MsgType, Any, SendableMessage], Callable[[StreamMessage[MsgType]], Awaitable[bool]], AsyncParser[MsgType, StreamMessage[MsgType]], AsyncDecoder[StreamMessage[MsgType]], Sequence[Callable[[Any], BaseMiddleware]], CallModel[Any, SendableMessage]]]] = []

description property #

description: Optional[str]

Returns the description of the handler.

global_middlewares instance-attribute #

global_middlewares: Sequence[Callable[[Any], BaseMiddleware]] = []

include_in_schema instance-attribute #

include_in_schema = include_in_schema

log_context_builder instance-attribute #

log_context_builder = log_context_builder

running instance-attribute #

running = False

consume abstractmethod #

consume(msg: MsgType) -> SendableMessage

Consume a message.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Source code in faststream/broker/handler.py
@abstractmethod
def consume(self, msg: MsgType) -> SendableMessage:
    """Consume a message.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        NotImplementedError: If the method is not implemented.

    """
    raise NotImplementedError()

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]

Get the payloads of the handler.

Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    """Get the payloads of the handler."""
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep,
            prefix=f"{self._title or self.call_name}:Message",
        )
        payloads.append(
            (
                body,
                to_camelcase(unwrap(h._original_call).__name__),
            ),
        )

    return payloads

name #

name() -> str

Returns the name of the API operation.

Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    """Returns the name of the API operation."""
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    return {}