HandlerItem(*, handler, filter, item_parser, item_decoder, item_middlewares, dependencies)
Bases: SetupAble
, Generic[MsgType]
A class representing handler overloaded item.
Source code in faststream/broker/subscriber/call_item.py
| def __init__(
self,
*,
handler: "HandlerCallWrapper[MsgType, ..., Any]",
filter: "AsyncFilter[StreamMessage[MsgType]]",
item_parser: Optional["CustomCallable"],
item_decoder: Optional["CustomCallable"],
item_middlewares: Iterable["SubscriberMiddleware[StreamMessage[MsgType]]"],
dependencies: Iterable["Depends"],
) -> None:
self.handler = handler
self.filter = filter
self.item_parser = item_parser
self.item_decoder = item_decoder
self.item_middlewares = item_middlewares
self.dependencies = dependencies
self.dependant = None
|
handler instance-attribute
filter instance-attribute
item_parser instance-attribute
item_decoder instance-attribute
item_middlewares instance-attribute
dependencies instance-attribute
dependant instance-attribute
call_name property
Returns the name of the original call.
description property
Returns the description of original call.
setup
setup(*, parser, decoder, broker_dependencies, apply_types, is_validate, _get_dependant, _call_decorators)
Source code in faststream/broker/subscriber/call_item.py
| @override
def setup( # type: ignore[override]
self,
*,
parser: "AsyncCallable",
decoder: "AsyncCallable",
broker_dependencies: Iterable["Depends"],
apply_types: bool,
is_validate: bool,
_get_dependant: Optional[Callable[..., Any]],
_call_decorators: Iterable["Decorator"],
) -> None:
if self.dependant is None:
self.item_parser = parser
self.item_decoder = decoder
dependencies = (*broker_dependencies, *self.dependencies)
dependant = self.handler.set_wrapped(
apply_types=apply_types,
is_validate=is_validate,
dependencies=dependencies,
_get_dependant=_get_dependant,
_call_decorators=_call_decorators,
)
if _get_dependant is None:
self.dependant = dependant
else:
self.dependant = _get_dependant(
self.handler._original_call,
dependencies,
)
|
is_suitable async
Check is message suite for current filter.
Source code in faststream/broker/subscriber/call_item.py
| async def is_suitable(
self,
msg: MsgType,
cache: Dict[Any, Any],
) -> Optional["StreamMessage[MsgType]"]:
"""Check is message suite for current filter."""
if not (parser := cast(Optional["AsyncCallable"], self.item_parser)) or not (
decoder := cast(Optional["AsyncCallable"], self.item_decoder)
):
raise SetupError("You should setup `HandlerItem` at first.")
message = cache[parser] = cast(
"StreamMessage[MsgType]", cache.get(parser) or await parser(msg)
)
message._decoded_body = cache[decoder] = cache.get(decoder) or await decoder(
message
)
if await self.filter(message):
return message
return None
|
call async
call(message, _extra_middlewares)
Execute wrapped handler with consume middlewares.
Source code in faststream/broker/subscriber/call_item.py
| async def call(
self,
/,
message: "StreamMessage[MsgType]",
_extra_middlewares: Iterable["SubscriberMiddleware[Any]"],
) -> Any:
"""Execute wrapped handler with consume middlewares."""
call: AsyncFuncAny = self.handler.call_wrapped
for middleware in chain(self.item_middlewares, _extra_middlewares):
call = partial(middleware, call)
try:
result = await call(message)
except (IgnoredException, SystemExit):
self.handler.trigger()
raise
except Exception as e:
self.handler.trigger(error=e)
raise e
else:
self.handler.trigger(result=result)
return result
|