Skip to content

HandlerItem

faststream.broker.subscriber.call_item.HandlerItem #

HandlerItem(*, handler, filter, item_parser, item_decoder, item_middlewares, dependencies)

Bases: SetupAble, Generic[MsgType]

A class representing handler overloaded item.

Source code in faststream/broker/subscriber/call_item.py
def __init__(
    self,
    *,
    handler: "HandlerCallWrapper[MsgType, ..., Any]",
    filter: "AsyncFilter[StreamMessage[MsgType]]",
    item_parser: Optional["CustomCallable"],
    item_decoder: Optional["CustomCallable"],
    item_middlewares: Iterable["SubscriberMiddleware[StreamMessage[MsgType]]"],
    dependencies: Iterable["Depends"],
) -> None:
    self.handler = handler
    self.filter = filter
    self.item_parser = item_parser
    self.item_decoder = item_decoder
    self.item_middlewares = item_middlewares
    self.dependencies = dependencies
    self.dependant = None

handler instance-attribute #

handler = handler

filter instance-attribute #

filter = filter

item_parser instance-attribute #

item_parser = item_parser

item_decoder instance-attribute #

item_decoder = item_decoder

item_middlewares instance-attribute #

item_middlewares = item_middlewares

dependencies instance-attribute #

dependencies = dependencies

dependant instance-attribute #

dependant = None

call_name property #

call_name

Returns the name of the original call.

description property #

description

Returns the description of original call.

setup #

setup(*, parser, decoder, broker_dependencies, apply_types, is_validate, _get_dependant, _call_decorators)
Source code in faststream/broker/subscriber/call_item.py
@override
def setup(  # type: ignore[override]
    self,
    *,
    parser: "AsyncCallable",
    decoder: "AsyncCallable",
    broker_dependencies: Iterable["Depends"],
    apply_types: bool,
    is_validate: bool,
    _get_dependant: Optional[Callable[..., Any]],
    _call_decorators: Iterable["Decorator"],
) -> None:
    if self.dependant is None:
        self.item_parser = parser
        self.item_decoder = decoder

        dependencies = (*broker_dependencies, *self.dependencies)

        dependant = self.handler.set_wrapped(
            apply_types=apply_types,
            is_validate=is_validate,
            dependencies=dependencies,
            _get_dependant=_get_dependant,
            _call_decorators=_call_decorators,
        )

        if _get_dependant is None:
            self.dependant = dependant
        else:
            self.dependant = _get_dependant(
                self.handler._original_call,
                dependencies,
            )

is_suitable async #

is_suitable(msg, cache)

Check is message suite for current filter.

Source code in faststream/broker/subscriber/call_item.py
async def is_suitable(
    self,
    msg: MsgType,
    cache: Dict[Any, Any],
) -> Optional["StreamMessage[MsgType]"]:
    """Check is message suite for current filter."""
    if not (parser := cast(Optional["AsyncCallable"], self.item_parser)) or not (
        decoder := cast(Optional["AsyncCallable"], self.item_decoder)
    ):
        raise SetupError("You should setup `HandlerItem` at first.")

    message = cache[parser] = cast(
        "StreamMessage[MsgType]", cache.get(parser) or await parser(msg)
    )

    message._decoded_body = cache[decoder] = cache.get(decoder) or await decoder(
        message
    )

    if await self.filter(message):
        return message

    return None

call async #

call(message, _extra_middlewares)

Execute wrapped handler with consume middlewares.

Source code in faststream/broker/subscriber/call_item.py
async def call(
    self,
    /,
    message: "StreamMessage[MsgType]",
    _extra_middlewares: Iterable["SubscriberMiddleware[Any]"],
) -> Any:
    """Execute wrapped handler with consume middlewares."""
    call: AsyncFuncAny = self.handler.call_wrapped

    for middleware in chain(self.item_middlewares, _extra_middlewares):
        call = partial(middleware, call)

    try:
        result = await call(message)

    except (IgnoredException, SystemExit):
        self.handler.trigger()
        raise

    except Exception as e:
        self.handler.trigger(error=e)
        raise e

    else:
        self.handler.trigger(result=result)
        return result