Skip to content

AsyncHandler

faststream.broker.handler.AsyncHandler #

AsyncHandler(
    *,
    log_context_builder: Callable[
        [StreamMessage[Any]], Dict[str, str]
    ],
    description: Optional[str] = None,
    title: Optional[str] = None
)

Bases: BaseHandler[MsgType]

A class representing an asynchronous handler.

METHOD DESCRIPTION
add_call

adds a new call to the list of calls

consume

consumes a message and returns a sendable message

start

starts the handler

close

closes the handler

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Initialize a new instance of the class.

PARAMETER DESCRIPTION
description

Optional description of the instance.

TYPE: Optional[str] DEFAULT: None

title

Optional title of the instance.

TYPE: Optional[str] DEFAULT: None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/handler.py
def __init__(
    self,
    *,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    description: Optional[str] = None,
    title: Optional[str] = None,
):
    """Initialize a new instance of the class.

    Args:
        description: Optional description of the instance.
        title: Optional title of the instance.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    self.calls = []  # type: ignore[assignment]
    self.global_middlewares = []
    # AsyncAPI information
    self._description = description
    self._title = title
    self.log_context_builder = log_context_builder

call_name property #

call_name: str

calls instance-attribute #

calls: List[
    Tuple[
        HandlerCallWrapper[MsgType, Any, SendableMessage],
        Callable[[StreamMessage[MsgType]], Awaitable[bool]],
        AsyncParser[MsgType, Any],
        AsyncDecoder[StreamMessage[MsgType]],
        Sequence[Callable[[Any], BaseMiddleware]],
        CallModel[Any, SendableMessage],
    ]
]

description property #

description: Optional[str]

global_middlewares instance-attribute #

global_middlewares: Sequence[
    Callable[[Any], BaseMiddleware]
] = []

log_context_builder instance-attribute #

log_context_builder = log_context_builder

add_call #

add_call(
    *,
    handler: HandlerCallWrapper[
        MsgType, P_HandlerParams, T_HandlerReturn
    ],
    parser: CustomParser[MsgType, Any],
    decoder: CustomDecoder[Any],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    filter: Filter[StreamMessage[MsgType]],
    middlewares: Optional[
        Sequence[Callable[[Any], BaseMiddleware]]
    ]
) -> None

Adds a call to the handler.

PARAMETER DESCRIPTION
handler

The handler call wrapper.

TYPE: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]

parser

The custom parser.

TYPE: CustomParser[MsgType, Any]

decoder

The custom decoder.

TYPE: CustomDecoder[Any]

dependant

The call model.

TYPE: CallModel[P_HandlerParams, T_HandlerReturn]

filter

The filter for stream messages.

TYPE: Filter[StreamMessage[MsgType]]

middlewares

Optional sequence of middlewares.

TYPE: Optional[Sequence[Callable[[Any], BaseMiddleware]]]

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/handler.py
def add_call(
    self,
    *,
    handler: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
    parser: CustomParser[MsgType, Any],
    decoder: CustomDecoder[Any],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    filter: Filter[StreamMessage[MsgType]],
    middlewares: Optional[Sequence[Callable[[Any], BaseMiddleware]]],
) -> None:
    """Adds a call to the handler.

    Args:
        handler: The handler call wrapper.
        parser: The custom parser.
        decoder: The custom decoder.
        dependant: The call model.
        filter: The filter for stream messages.
        middlewares: Optional sequence of middlewares.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    self.calls.append(
        (
            handler,
            to_async(filter),
            to_async(parser) if parser else None,  # type: ignore[arg-type]
            to_async(decoder) if decoder else None,  # type: ignore[arg-type]
            middlewares or (),
            dependant,
        )
    )

close abstractmethod async #

close() -> None
Source code in faststream/broker/handler.py
@abstractmethod
async def close(self) -> None:
    raise NotImplementedError()

consume async #

consume(msg: MsgType) -> SendableMessage

Consume a message asynchronously.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
StopConsume

If the consumption needs to be stopped.

RAISES DESCRIPTION
Exception

If an error occurs during consumption.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/handler.py
@override
async def consume(self, msg: MsgType) -> SendableMessage:  # type: ignore[override]
    """Consume a message asynchronously.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        StopConsume: If the consumption needs to be stopped.

    Raises:
        Exception: If an error occurs during consumption.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    result: Optional[WrappedReturn[SendableMessage]] = None
    result_msg: SendableMessage = None

    async with AsyncExitStack() as stack:
        gl_middlewares: List[BaseMiddleware] = []

        stack.enter_context(context.scope("handler_", self))

        for m in self.global_middlewares:
            gl_middlewares.append(await stack.enter_async_context(m(msg)))

        logged = False
        processed = False
        for handler, filter_, parser, decoder, middlewares, _ in self.calls:
            local_middlewares: List[BaseMiddleware] = []
            for local_m in middlewares:
                local_middlewares.append(
                    await stack.enter_async_context(local_m(msg))
                )

            all_middlewares = gl_middlewares + local_middlewares

            # TODO: add parser & decoder caches
            message = await parser(msg)

            if not logged:  # pragma: no branch
                log_context_tag = context.set_local(
                    "log_context", self.log_context_builder(message)
                )

            message.decoded_body = await decoder(message)
            message.processed = processed

            if await filter_(message):
                assert (  # nosec B101
                    not processed
                ), "You can't proccess a message with multiple consumers"

                try:
                    async with AsyncExitStack() as consume_stack:
                        for m_consume in all_middlewares:
                            message.decoded_body = (
                                await consume_stack.enter_async_context(
                                    m_consume.consume_scope(message.decoded_body)
                                )
                            )

                        result = await cast(
                            Awaitable[Optional[WrappedReturn[SendableMessage]]],
                            handler.call_wrapped(message),
                        )

                    if result is not None:
                        result_msg, pub_response = result

                        # TODO: suppress all publishing errors and raise them after all publishers will be tried
                        for publisher in (pub_response, *handler._publishers):
                            if publisher is not None:
                                async with AsyncExitStack() as pub_stack:
                                    result_to_send = result_msg

                                    for m_pub in all_middlewares:
                                        result_to_send = (
                                            await pub_stack.enter_async_context(
                                                m_pub.publish_scope(result_to_send)
                                            )
                                        )

                                    await publisher.publish(
                                        message=result_to_send,
                                        correlation_id=message.correlation_id,
                                    )

                except StopConsume:
                    await self.close()
                    handler.trigger()

                except HandlerException as e:  # pragma: no cover
                    handler.trigger()
                    raise e

                except Exception as e:
                    handler.trigger(error=e)
                    raise e

                else:
                    handler.trigger(result=result[0] if result else None)
                    message.processed = processed = True
                    if IS_OPTIMIZED:  # pragma: no cover
                        break

        assert processed, "You have to consume message"  # nosec B101

    context.reset_local("log_context", log_context_tag)
    return result_msg

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep, prefix=f"{self._title or self.call_name}:Message"
        )
        payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))

    return payloads

name #

name() -> str
Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]
Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    return {}

start abstractmethod async #

start() -> None
Source code in faststream/broker/handler.py
@abstractmethod
async def start(self) -> None:
    raise NotImplementedError()

Last update: 2023-11-13