AsyncHandler(
*,
log_context_builder: Callable[
[StreamMessage[Any]], Dict[str, str]
],
description: Optional[str] = None,
title: Optional[str] = None
)
Bases: BaseHandler[MsgType]
A class representing an asynchronous handler.
METHOD | DESCRIPTION |
add_call | adds a new call to the list of calls |
consume | consumes a message and returns a sendable message |
start | |
close | |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Initialize a new instance of the class.
PARAMETER | DESCRIPTION |
description | Optional description of the instance. TYPE: Optional[str] DEFAULT: None |
title | Optional title of the instance. TYPE: Optional[str] DEFAULT: None |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/handler.py
| def __init__(
self,
*,
log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
description: Optional[str] = None,
title: Optional[str] = None,
):
"""Initialize a new instance of the class.
Args:
description: Optional description of the instance.
title: Optional title of the instance.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self.calls = [] # type: ignore[assignment]
self.global_middlewares = []
# AsyncAPI information
self._description = description
self._title = title
self.log_context_builder = log_context_builder
|
calls instance-attribute
calls: List[
Tuple[
HandlerCallWrapper[MsgType, Any, SendableMessage],
Callable[[StreamMessage[MsgType]], Awaitable[bool]],
AsyncParser[MsgType, Any],
AsyncDecoder[StreamMessage[MsgType]],
Sequence[Callable[[Any], BaseMiddleware]],
CallModel[Any, SendableMessage],
]
]
description property
description: Optional[str]
global_middlewares instance-attribute
global_middlewares: Sequence[
Callable[[Any], BaseMiddleware]
] = []
log_context_builder instance-attribute
log_context_builder = log_context_builder
add_call
add_call(
*,
handler: HandlerCallWrapper[
MsgType, P_HandlerParams, T_HandlerReturn
],
parser: CustomParser[MsgType, Any],
decoder: CustomDecoder[Any],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
filter: Filter[StreamMessage[MsgType]],
middlewares: Optional[
Sequence[Callable[[Any], BaseMiddleware]]
]
) -> None
Adds a call to the handler.
PARAMETER | DESCRIPTION |
handler | The handler call wrapper. TYPE: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn] |
parser | TYPE: CustomParser[MsgType, Any] |
decoder | TYPE: CustomDecoder[Any] |
dependant | TYPE: CallModel[P_HandlerParams, T_HandlerReturn] |
filter | The filter for stream messages. TYPE: Filter[StreamMessage[MsgType]] |
middlewares | Optional sequence of middlewares. TYPE: Optional[Sequence[Callable[[Any], BaseMiddleware]]] |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/handler.py
| def add_call(
self,
*,
handler: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
parser: CustomParser[MsgType, Any],
decoder: CustomDecoder[Any],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
filter: Filter[StreamMessage[MsgType]],
middlewares: Optional[Sequence[Callable[[Any], BaseMiddleware]]],
) -> None:
"""Adds a call to the handler.
Args:
handler: The handler call wrapper.
parser: The custom parser.
decoder: The custom decoder.
dependant: The call model.
filter: The filter for stream messages.
middlewares: Optional sequence of middlewares.
Returns:
None
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self.calls.append(
(
handler,
to_async(filter),
to_async(parser) if parser else None, # type: ignore[arg-type]
to_async(decoder) if decoder else None, # type: ignore[arg-type]
middlewares or (),
dependant,
)
)
|
close abstractmethod
async
Source code in faststream/broker/handler.py
| @abstractmethod
async def close(self) -> None:
raise NotImplementedError()
|
consume async
consume(msg: MsgType) -> SendableMessage
Consume a message asynchronously.
PARAMETER | DESCRIPTION |
msg | The message to be consumed. TYPE: MsgType |
RETURNS | DESCRIPTION |
SendableMessage | |
RAISES | DESCRIPTION |
StopConsume | If the consumption needs to be stopped. |
RAISES | DESCRIPTION |
Exception | If an error occurs during consumption. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/handler.py
| @override
async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[override]
"""Consume a message asynchronously.
Args:
msg: The message to be consumed.
Returns:
The sendable message.
Raises:
StopConsume: If the consumption needs to be stopped.
Raises:
Exception: If an error occurs during consumption.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
result: Optional[WrappedReturn[SendableMessage]] = None
result_msg: SendableMessage = None
async with AsyncExitStack() as stack:
gl_middlewares: List[BaseMiddleware] = []
stack.enter_context(context.scope("handler_", self))
for m in self.global_middlewares:
gl_middlewares.append(await stack.enter_async_context(m(msg)))
logged = False
processed = False
for handler, filter_, parser, decoder, middlewares, _ in self.calls:
local_middlewares: List[BaseMiddleware] = []
for local_m in middlewares:
local_middlewares.append(
await stack.enter_async_context(local_m(msg))
)
all_middlewares = gl_middlewares + local_middlewares
# TODO: add parser & decoder caches
message = await parser(msg)
if not logged: # pragma: no branch
log_context_tag = context.set_local(
"log_context", self.log_context_builder(message)
)
message.decoded_body = await decoder(message)
message.processed = processed
if await filter_(message):
assert ( # nosec B101
not processed
), "You can't proccess a message with multiple consumers"
try:
async with AsyncExitStack() as consume_stack:
for m_consume in all_middlewares:
message.decoded_body = (
await consume_stack.enter_async_context(
m_consume.consume_scope(message.decoded_body)
)
)
result = await cast(
Awaitable[Optional[WrappedReturn[SendableMessage]]],
handler.call_wrapped(message),
)
if result is not None:
result_msg, pub_response = result
# TODO: suppress all publishing errors and raise them after all publishers will be tried
for publisher in (pub_response, *handler._publishers):
if publisher is not None:
async with AsyncExitStack() as pub_stack:
result_to_send = result_msg
for m_pub in all_middlewares:
result_to_send = (
await pub_stack.enter_async_context(
m_pub.publish_scope(result_to_send)
)
)
await publisher.publish(
message=result_to_send,
correlation_id=message.correlation_id,
)
except StopConsume:
await self.close()
handler.trigger()
except HandlerException as e: # pragma: no cover
handler.trigger()
raise e
except Exception as e:
handler.trigger(error=e)
raise e
else:
handler.trigger(result=result[0] if result else None)
message.processed = processed = True
if IS_OPTIMIZED: # pragma: no cover
break
assert processed, "You have to consume message" # nosec B101
context.reset_local("log_context", log_context_tag)
return result_msg
|
get_payloads
get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/handler.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
payloads: List[Tuple[AnyDict, str]] = []
for h, _, _, _, _, dep in self.calls:
body = parse_handler_params(
dep, prefix=f"{self._title or self.call_name}:Message"
)
payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))
return payloads
|
name
Source code in faststream/asyncapi/base.py
| @abstractproperty
def name(self) -> str:
raise NotImplementedError()
|
schema
schema() -> Dict[str, Channel]
Source code in faststream/asyncapi/base.py
| def schema(self) -> Dict[str, Channel]: # pragma: no cover
return {}
|
start abstractmethod
async
Source code in faststream/broker/handler.py
| @abstractmethod
async def start(self) -> None:
raise NotImplementedError()
|