Skip to content

BaseHandler

faststream.broker.handler.BaseHandler #

BaseHandler(
    *,
    log_context_builder: Callable[
        [StreamMessage[Any]], Dict[str, str]
    ],
    description: Optional[str] = None,
    title: Optional[str] = None
)

Bases: AsyncAPIOperation, Generic[MsgType]

A base handler class for asynchronous API operations.

METHOD DESCRIPTION
__init__

Initializes the BaseHandler object.

name

Returns the name of the handler.

call_name

Returns the name of the handler call.

description

Returns the description of the handler.

consume

Abstract method to consume a message.

Note: This class inherits from AsyncAPIOperation and is a generic class with type parameter MsgType.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Initialize a new instance of the class.

PARAMETER DESCRIPTION
description

Optional description of the instance.

TYPE: Optional[str] DEFAULT: None

title

Optional title of the instance.

TYPE: Optional[str] DEFAULT: None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/handler.py
def __init__(
    self,
    *,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    description: Optional[str] = None,
    title: Optional[str] = None,
):
    """Initialize a new instance of the class.

    Args:
        description: Optional description of the instance.
        title: Optional title of the instance.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    self.calls = []  # type: ignore[assignment]
    self.global_middlewares = []
    # AsyncAPI information
    self._description = description
    self._title = title
    self.log_context_builder = log_context_builder

call_name property #

call_name: str

calls instance-attribute #

calls: Union[
    List[
        Tuple[
            HandlerCallWrapper[
                MsgType, Any, SendableMessage
            ],
            Callable[[StreamMessage[MsgType]], bool],
            SyncParser[MsgType, StreamMessage[MsgType]],
            SyncDecoder[StreamMessage[MsgType]],
            Sequence[Callable[[Any], BaseMiddleware]],
            CallModel[Any, SendableMessage],
        ]
    ],
    List[
        Tuple[
            HandlerCallWrapper[
                MsgType, Any, SendableMessage
            ],
            Callable[
                [StreamMessage[MsgType]], Awaitable[bool]
            ],
            AsyncParser[MsgType, StreamMessage[MsgType]],
            AsyncDecoder[StreamMessage[MsgType]],
            Sequence[Callable[[Any], BaseMiddleware]],
            CallModel[Any, SendableMessage],
        ]
    ],
] = []

description property #

description: Optional[str]

global_middlewares instance-attribute #

global_middlewares: Sequence[
    Callable[[Any], BaseMiddleware]
] = []

log_context_builder instance-attribute #

log_context_builder = log_context_builder

consume abstractmethod #

consume(msg: MsgType) -> SendableMessage

Consume a message.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
NotImplementedError

If the method is not implemented.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/handler.py
@abstractmethod
def consume(self, msg: MsgType) -> SendableMessage:
    """Consume a message.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        NotImplementedError: If the method is not implemented.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    raise NotImplementedError()

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep, prefix=f"{self._title or self.call_name}:Message"
        )
        payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))

    return payloads

name #

name() -> str
Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]
Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    return {}

Last update: 2023-11-13