async def consume_scope(
self,
call_next: "AsyncFuncAny",
msg: "StreamMessage[Any]",
) -> Any:
if self._settings_provider is None:
return await call_next(msg)
messaging_system = self._settings_provider.messaging_system
consume_attrs = self._settings_provider.get_consume_attrs_from_message(msg)
destination_name = consume_attrs["destination_name"]
self._metrics_manager.add_received_message(
amount=consume_attrs["messages_count"],
broker=messaging_system,
handler=destination_name,
)
self._metrics_manager.observe_received_messages_size(
size=consume_attrs["message_size"],
broker=messaging_system,
handler=destination_name,
)
self._metrics_manager.add_received_message_in_process(
amount=consume_attrs["messages_count"],
broker=messaging_system,
handler=destination_name,
)
err: Optional[Exception] = None
start_time = time.perf_counter()
try:
result = await call_next(await self.on_consume(msg))
except Exception as e:
err = e
self._metrics_manager.add_received_processed_message_exception(
exception_type=type(err).__name__,
broker=messaging_system,
handler=destination_name,
)
raise
finally:
duration = time.perf_counter() - start_time
self._metrics_manager.observe_received_processed_message_duration(
duration=duration,
broker=messaging_system,
handler=destination_name,
)
self._metrics_manager.remove_received_message_in_process(
amount=consume_attrs["messages_count"],
broker=messaging_system,
handler=destination_name,
)
status = ProcessingStatus.acked
if msg.committed or err:
status = (
PROCESSING_STATUS_BY_ACK_STATUS.get(msg.committed) # type: ignore[arg-type]
or PROCESSING_STATUS_BY_HANDLER_EXCEPTION_MAP.get(type(err))
or ProcessingStatus.error
)
self._metrics_manager.add_received_processed_message(
amount=consume_attrs["messages_count"],
status=status,
broker=messaging_system,
handler=destination_name,
)
return result