SubscriberUsecase(*, no_ack, no_reply, retry, broker_dependencies, broker_middlewares, default_parser, default_decoder, title_, description_, include_in_schema)
Bases: AsyncAPIOperation
, SubscriberProto[MsgType]
A class representing an asynchronous handler.
Initialize a new instance of the class.
Source code in faststream/broker/subscriber/usecase.py
| def __init__(
self,
*,
no_ack: bool,
no_reply: bool,
retry: Union[bool, int],
broker_dependencies: Iterable["Depends"],
broker_middlewares: Sequence["BrokerMiddleware[MsgType]"],
default_parser: "AsyncCallable",
default_decoder: "AsyncCallable",
# AsyncAPI information
title_: Optional[str],
description_: Optional[str],
include_in_schema: bool,
) -> None:
"""Initialize a new instance of the class."""
self.calls = []
self._parser = default_parser
self._decoder = default_decoder
self._no_reply = no_reply
# Watcher args
self._no_ack = no_ack
self._retry = retry
self._call_options = None
self._call_decorators = ()
self.running = False
self.lock = sync_fake_context()
# Setup in include
self._broker_dependencies = broker_dependencies
self._broker_middlewares = broker_middlewares
# register in setup later
self._producer = None
self.graceful_timeout = None
self.extra_context = {}
self.extra_watcher_options = {}
# AsyncAPI
self.title_ = title_
self.description_ = description_
self.include_in_schema = include_in_schema
|
name property
Returns the name of the API operation.
description property
Returns the description of the API operation.
running instance-attribute
graceful_timeout instance-attribute
extra_watcher_options = {}
title_ instance-attribute
description_ instance-attribute
include_in_schema instance-attribute
call_name property
Returns the name of the handler call.
add_prefix abstractmethod
Source code in faststream/broker/proto.py
| @abstractmethod
def add_prefix(self, prefix: str) -> None: ...
|
schema
Returns the schema of the API operation as a dictionary of channel names and channel objects.
Source code in faststream/asyncapi/abc.py
| def schema(self) -> Dict[str, Channel]:
"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""
if self.include_in_schema:
return self.get_schema()
else:
return {}
|
get_one abstractmethod
async
Source code in faststream/broker/subscriber/proto.py
| @abstractmethod
async def get_one(
self, *, timeout: float = 5.0
) -> "Optional[StreamMessage[MsgType]]": ...
|
get_name abstractmethod
Name property fallback.
Source code in faststream/asyncapi/abc.py
| @abstractmethod
def get_name(self) -> str:
"""Name property fallback."""
raise NotImplementedError()
|
get_schema abstractmethod
Generate AsyncAPI schema.
Source code in faststream/asyncapi/abc.py
| @abstractmethod
def get_schema(self) -> Dict[str, Channel]:
"""Generate AsyncAPI schema."""
raise NotImplementedError()
|
add_middleware
add_middleware(middleware)
Source code in faststream/broker/subscriber/usecase.py
| def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
self._broker_middlewares = (*self._broker_middlewares, middleware)
|
setup
setup(*, logger, producer, graceful_timeout, extra_context, broker_parser, broker_decoder, apply_types, is_validate, _get_dependant, _call_decorators)
Source code in faststream/broker/subscriber/usecase.py
| @override
def setup( # type: ignore[override]
self,
*,
logger: Optional["LoggerProto"],
producer: Optional["ProducerProto"],
graceful_timeout: Optional[float],
extra_context: "AnyDict",
# broker options
broker_parser: Optional["CustomCallable"],
broker_decoder: Optional["CustomCallable"],
# dependant args
apply_types: bool,
is_validate: bool,
_get_dependant: Optional[Callable[..., Any]],
_call_decorators: Iterable["Decorator"],
) -> None:
self.lock = MultiLock()
self._producer = producer
self.graceful_timeout = graceful_timeout
self.extra_context = extra_context
self.watcher = get_watcher_context(logger, self._no_ack, self._retry)
for call in self.calls:
if parser := call.item_parser or broker_parser:
async_parser = resolve_custom_func(to_async(parser), self._parser)
else:
async_parser = self._parser
if decoder := call.item_decoder or broker_decoder:
async_decoder = resolve_custom_func(to_async(decoder), self._decoder)
else:
async_decoder = self._decoder
self._parser = async_parser
self._decoder = async_decoder
call.setup(
parser=async_parser,
decoder=async_decoder,
apply_types=apply_types,
is_validate=is_validate,
_get_dependant=_get_dependant,
_call_decorators=(*self._call_decorators, *_call_decorators),
broker_dependencies=self._broker_dependencies,
)
call.handler.refresh(with_mock=False)
|
start abstractmethod
async
Start the handler.
Source code in faststream/broker/subscriber/usecase.py
| @abstractmethod
async def start(self) -> None:
"""Start the handler."""
self.running = True
|
close abstractmethod
async
Close the handler.
Blocks event loop up to graceful_timeout seconds.
Source code in faststream/broker/subscriber/usecase.py
| @abstractmethod
async def close(self) -> None:
"""Close the handler.
Blocks event loop up to graceful_timeout seconds.
"""
self.running = False
if isinstance(self.lock, MultiLock):
await self.lock.wait_release(self.graceful_timeout)
|
add_call
add_call(*, filter_, parser_, decoder_, middlewares_, dependencies_)
Source code in faststream/broker/subscriber/usecase.py
| def add_call(
self,
*,
filter_: "Filter[Any]",
parser_: Optional["CustomCallable"],
decoder_: Optional["CustomCallable"],
middlewares_: Sequence["SubscriberMiddleware[Any]"],
dependencies_: Iterable["Depends"],
) -> Self:
self._call_options = _CallOptions(
filter=filter_,
parser=parser_,
decoder=decoder_,
middlewares=middlewares_,
dependencies=dependencies_,
)
return self
|
consume async
Consume a message asynchronously.
Source code in faststream/broker/subscriber/usecase.py
| async def consume(self, msg: MsgType) -> Any:
"""Consume a message asynchronously."""
if not self.running:
return None
try:
return await self.process_message(msg)
except StopConsume:
# Stop handler at StopConsume exception
await self.close()
except SystemExit:
# Stop handler at `exit()` call
await self.close()
if app := context.get("app"):
app.exit()
except Exception: # nosec B110
# All other exceptions were logged by CriticalLogMiddleware
pass
|
process_message async
Execute all message processing stages.
Source code in faststream/broker/subscriber/usecase.py
| async def process_message(self, msg: MsgType) -> "Response":
"""Execute all message processing stages."""
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)
# Enter context before middlewares
for k, v in self.extra_context.items():
stack.enter_context(context.scope(k, v))
stack.enter_context(context.scope("handler_", self))
# enter all middlewares
middlewares: List[BaseMiddleware] = []
for base_m in self._broker_middlewares:
middleware = base_m(msg)
middlewares.append(middleware)
await middleware.__aenter__()
cache: Dict[Any, Any] = {}
parsing_error: Optional[Exception] = None
for h in self.calls:
try:
message = await h.is_suitable(msg, cache)
except Exception as e:
parsing_error = e
break
if message is not None:
# Acknowledgement scope
# TODO: move it to scope enter at `retry` option deprecation
await stack.enter_async_context(
self.watcher(
message,
**self.extra_watcher_options,
)
)
stack.enter_context(
context.scope("log_context", self.get_log_context(message))
)
stack.enter_context(context.scope("message", message))
# Middlewares should be exited before scope release
for m in middlewares:
stack.push_async_exit(m.__aexit__)
result_msg = ensure_response(
await h.call(
message=message,
# consumer middlewares
_extra_middlewares=(
m.consume_scope for m in middlewares[::-1]
),
)
)
if not result_msg.correlation_id:
result_msg.correlation_id = message.correlation_id
for p in chain(
self.__get_response_publisher(message),
h.handler._publishers,
):
await p.publish(
result_msg.body,
**result_msg.as_publish_kwargs(),
# publisher middlewares
_extra_middlewares=[
m.publish_scope for m in middlewares[::-1]
],
)
# Return data for tests
return result_msg
# Suitable handler was not found or
# parsing/decoding exception occurred
for m in middlewares:
stack.push_async_exit(m.__aexit__)
if parsing_error:
raise parsing_error
else:
raise SubscriberNotFound(f"There is no suitable handler for {msg=}")
# An error was raised and processed by some middleware
return ensure_response(None)
|
get_log_context
Generate log context.
Source code in faststream/broker/subscriber/usecase.py
| def get_log_context(
self,
message: Optional["StreamMessage[MsgType]"],
) -> Dict[str, str]:
"""Generate log context."""
return {
"message_id": getattr(message, "message_id", ""),
}
|
get_description
Returns the description of the handler.
Source code in faststream/broker/subscriber/usecase.py
| def get_description(self) -> Optional[str]:
"""Returns the description of the handler."""
if not self.calls: # pragma: no cover
return None
else:
return self.calls[0].description
|
get_payloads
Get the payloads of the handler.
Source code in faststream/broker/subscriber/usecase.py
| def get_payloads(self) -> List[Tuple["AnyDict", str]]:
"""Get the payloads of the handler."""
payloads: List[Tuple[AnyDict, str]] = []
for h in self.calls:
if h.dependant is None:
raise SetupError("You should setup `Handler` at first.")
body = parse_handler_params(
h.dependant,
prefix=f"{self.title_ or self.call_name}:Message",
)
payloads.append((body, to_camelcase(h.call_name)))
if not self.calls:
payloads.append(
(
{
"title": f"{self.title_ or self.call_name}:Message:Payload",
},
to_camelcase(self.call_name),
)
)
return payloads
|