Bases: BaseHandler[MsgType]
A class representing an asynchronous handler.
METHOD | DESCRIPTION |
add_call | adds a new call to the list of calls |
consume | consumes a message and returns a sendable message |
start | |
close | |
Initialize a new instance of the class.
Source code in faststream/broker/handler.py
| def __init__(
self,
*,
log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
description: Optional[str] = None,
title: Optional[str] = None,
include_in_schema: bool = True,
graceful_timeout: Optional[float] = None,
) -> None:
"""Initialize a new instance of the class."""
super().__init__(
log_context_builder=log_context_builder,
description=description,
title=title,
include_in_schema=include_in_schema,
)
self.lock = MultiLock()
self.graceful_timeout = graceful_timeout
|
call_name property
Returns the name of the handler call.
calls instance-attribute
calls: List[
Tuple[
HandlerCallWrapper[MsgType, Any, SendableMessage],
Callable[[StreamMessage[MsgType]], Awaitable[bool]],
AsyncParser[MsgType, Any],
AsyncDecoder[StreamMessage[MsgType]],
Sequence[Callable[[Any], BaseMiddleware]],
CallModel[Any, SendableMessage],
]
]
description property
Returns the description of the handler.
global_middlewares instance-attribute
graceful_timeout instance-attribute
include_in_schema instance-attribute
log_context_builder instance-attribute
running instance-attribute
add_call
add_call(
*,
handler: HandlerCallWrapper[
MsgType, P_HandlerParams, T_HandlerReturn
],
parser: CustomParser[MsgType, Any],
decoder: CustomDecoder[Any],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
filter: Filter[StreamMessage[MsgType]],
middlewares: Optional[
Sequence[Callable[[Any], BaseMiddleware]]
]
) -> None
Adds a call to the handler.
PARAMETER | DESCRIPTION |
handler | The handler call wrapper. TYPE: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn] |
parser | TYPE: CustomParser[MsgType, Any] |
decoder | TYPE: CustomDecoder[Any] |
dependant | TYPE: CallModel[P_HandlerParams, T_HandlerReturn] |
filter | The filter for stream messages. TYPE: Filter[StreamMessage[MsgType]] |
middlewares | Optional sequence of middlewares. TYPE: Optional[Sequence[Callable[[Any], BaseMiddleware]]] |
Source code in faststream/broker/handler.py
| def add_call(
self,
*,
handler: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
parser: CustomParser[MsgType, Any],
decoder: CustomDecoder[Any],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
filter: Filter[StreamMessage[MsgType]],
middlewares: Optional[Sequence[Callable[[Any], BaseMiddleware]]],
) -> None:
"""Adds a call to the handler.
Args:
handler: The handler call wrapper.
parser: The custom parser.
decoder: The custom decoder.
dependant: The call model.
filter: The filter for stream messages.
middlewares: Optional sequence of middlewares.
Returns:
None
"""
self.calls.append(
(
handler,
to_async(filter),
to_async(parser) if parser else None, # type: ignore[arg-type]
to_async(decoder) if decoder else None, # type: ignore[arg-type]
middlewares or (),
dependant,
)
)
|
close abstractmethod
async
Close the handler.
Source code in faststream/broker/handler.py
| @abstractmethod
async def close(self) -> None:
"""Close the handler."""
self.running = False
await self.lock.wait_release(self.graceful_timeout)
|
consume async
consume(msg: MsgType) -> SendableMessage
Consume a message asynchronously.
PARAMETER | DESCRIPTION |
msg | The message to be consumed. TYPE: MsgType |
RETURNS | DESCRIPTION |
SendableMessage | |
RAISES | DESCRIPTION |
StopConsume | If the consumption needs to be stopped. |
RAISES | DESCRIPTION |
Exception | If an error occurs during consumption. |
Source code in faststream/broker/handler.py
| @override
async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[override]
"""Consume a message asynchronously.
Args:
msg: The message to be consumed.
Returns:
The sendable message.
Raises:
StopConsume: If the consumption needs to be stopped.
Raises:
Exception: If an error occurs during consumption.
"""
result: Optional[WrappedReturn[SendableMessage]] = None
result_msg: SendableMessage = None
if not self.running:
return result_msg
log_context_tag: Optional["Token[Any]"] = None
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)
stack.enter_context(context.scope("handler_", self))
gl_middlewares: List[BaseMiddleware] = [
await stack.enter_async_context(m(msg)) for m in self.global_middlewares
]
logged = False
processed = False
for handler, filter_, parser, decoder, middlewares, _ in self.calls:
local_middlewares: List[BaseMiddleware] = [
await stack.enter_async_context(m(msg)) for m in middlewares
]
all_middlewares = gl_middlewares + local_middlewares
# TODO: add parser & decoder caches
message = await parser(msg)
if not logged: # pragma: no branch
log_context_tag = context.set_local(
"log_context",
self.log_context_builder(message),
)
message.decoded_body = await decoder(message)
message.processed = processed
if await filter_(message):
assert ( # nosec B101
not processed
), "You can't process a message with multiple consumers"
try:
async with AsyncExitStack() as consume_stack:
for m_consume in all_middlewares:
message.decoded_body = (
await consume_stack.enter_async_context(
m_consume.consume_scope(message.decoded_body)
)
)
result = await cast(
Awaitable[Optional[WrappedReturn[SendableMessage]]],
handler.call_wrapped(message),
)
if result is not None:
result_msg, pub_response = result
# TODO: suppress all publishing errors and raise them after all publishers will be tried
for publisher in (pub_response, *handler._publishers):
if publisher is not None:
async with AsyncExitStack() as pub_stack:
result_to_send = result_msg
for m_pub in all_middlewares:
result_to_send = (
await pub_stack.enter_async_context(
m_pub.publish_scope(result_to_send)
)
)
await publisher.publish(
message=result_to_send,
correlation_id=message.correlation_id,
)
except StopConsume:
await self.close()
handler.trigger()
except HandlerException as e: # pragma: no cover
handler.trigger()
raise e
except Exception as e:
handler.trigger(error=e)
raise e
else:
handler.trigger(result=result[0] if result else None)
message.processed = processed = True
if IS_OPTIMIZED: # pragma: no cover
break
assert not self.running or processed, "You have to consume message" # nosec B101
if log_context_tag is not None:
context.reset_local("log_context", log_context_tag)
return result_msg
|
get_payloads
Get the payloads of the handler.
Source code in faststream/broker/handler.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
"""Get the payloads of the handler."""
payloads: List[Tuple[AnyDict, str]] = []
for h, _, _, _, _, dep in self.calls:
body = parse_handler_params(
dep, prefix=f"{self._title or self.call_name}:Message"
)
payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))
return payloads
|
name
Returns the name of the API operation.
Source code in faststream/asyncapi/base.py
| @abstractproperty
def name(self) -> str:
"""Returns the name of the API operation."""
raise NotImplementedError()
|
schema
Returns the schema of the API operation as a dictionary of channel names and channel objects.
Source code in faststream/asyncapi/base.py
| def schema(self) -> Dict[str, Channel]: # pragma: no cover
"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""
return {}
|
start abstractmethod
async
Start the handler.
Source code in faststream/broker/handler.py
| @abstractmethod
async def start(self) -> None:
"""Start the handler."""
self.running = True
|