LogicHandler(
queue: RabbitQueue,
log_context_builder: Callable[
[StreamMessage[Any]], Dict[str, str]
],
graceful_timeout: Optional[float] = None,
exchange: Optional[RabbitExchange] = None,
consume_args: Optional[AnyDict] = None,
description: Optional[str] = None,
title: Optional[str] = None,
include_in_schema: bool = True,
virtual_host: str = "/",
)
Bases: AsyncHandler[IncomingMessage]
, BaseRMQInformation
A class to handle logic for RabbitMQ message consumption.
METHOD | DESCRIPTION |
__init__ | Initializes the LogicHandler object |
add_call | Adds a call to be handled by the LogicHandler |
start | Starts consuming messages from the queue |
close | Closes the consumer and cancels message consumption |
Initialize a RabbitMQ consumer.
PARAMETER | DESCRIPTION |
queue | RabbitQueue object representing the queue to consume from TYPE: RabbitQueue |
log_context_builder | Callable that returns a dictionary with log context information TYPE: Callable[[StreamMessage[Any]], Dict[str, str]] |
graceful_timeout | Optional float representing the graceful timeout TYPE: Optional[float] DEFAULT: None |
exchange | RabbitExchange object representing the exchange to bind the queue to (optional) TYPE: Optional[RabbitExchange] DEFAULT: None |
consume_args | Additional arguments for consuming from the queue (optional) TYPE: Optional[AnyDict] DEFAULT: None |
description | Description of the consumer (optional) TYPE: Optional[str] DEFAULT: None |
title | Title of the consumer (optional) TYPE: Optional[str] DEFAULT: None |
include_in_schema | Whether to include the consumer in the API specification (optional) TYPE: bool DEFAULT: True |
virtual_host | Virtual host to connect to (optional) TYPE: str DEFAULT: '/' |
Source code in faststream/rabbit/handler.py
| def __init__(
self,
queue: RabbitQueue,
log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
graceful_timeout: Optional[float] = None,
# RMQ information
exchange: Optional[RabbitExchange] = None,
consume_args: Optional[AnyDict] = None,
# AsyncAPI information
description: Optional[str] = None,
title: Optional[str] = None,
include_in_schema: bool = True,
virtual_host: str = "/",
) -> None:
"""Initialize a RabbitMQ consumer.
Args:
queue: RabbitQueue object representing the queue to consume from
log_context_builder: Callable that returns a dictionary with log context information
graceful_timeout: Optional float representing the graceful timeout
exchange: RabbitExchange object representing the exchange to bind the queue to (optional)
consume_args: Additional arguments for consuming from the queue (optional)
description: Description of the consumer (optional)
title: Title of the consumer (optional)
include_in_schema: Whether to include the consumer in the API specification (optional)
virtual_host: Virtual host to connect to (optional)
"""
super().__init__(
log_context_builder=log_context_builder,
description=description,
title=title,
include_in_schema=include_in_schema,
graceful_timeout=graceful_timeout,
)
self.queue = queue
self.exchange = exchange
self.virtual_host = virtual_host
self.consume_args = consume_args or {}
self._consumer_tag = None
self._queue_obj = None
|
call_name property
Returns the name of the handler call.
calls instance-attribute
calls: List[
Tuple[
HandlerCallWrapper[MsgType, Any, SendableMessage],
Callable[[StreamMessage[MsgType]], Awaitable[bool]],
AsyncParser[MsgType, Any],
AsyncDecoder[StreamMessage[MsgType]],
Sequence[Callable[[Any], BaseMiddleware]],
CallModel[Any, SendableMessage],
]
]
consume_args instance-attribute
description property
Returns the description of the handler.
exchange instance-attribute
global_middlewares instance-attribute
graceful_timeout instance-attribute
include_in_schema instance-attribute
log_context_builder instance-attribute
running instance-attribute
virtual_host instance-attribute
add_call
add_call(
*,
handler: HandlerCallWrapper[
IncomingMessage, P_HandlerParams, T_HandlerReturn
],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
parser: Optional[
CustomParser[IncomingMessage, RabbitMessage]
],
decoder: Optional[CustomDecoder[RabbitMessage]],
filter: Filter[RabbitMessage],
middlewares: Optional[
Sequence[
Callable[[IncomingMessage], BaseMiddleware]
]
]
) -> None
Add a call to the handler.
PARAMETER | DESCRIPTION |
handler | The handler for the call. TYPE: HandlerCallWrapper[IncomingMessage, P_HandlerParams, T_HandlerReturn] |
dependant | The dependant for the call. TYPE: CallModel[P_HandlerParams, T_HandlerReturn] |
parser | Optional custom parser for the call. TYPE: Optional[CustomParser[IncomingMessage, RabbitMessage]] |
decoder | Optional custom decoder for the call. TYPE: Optional[CustomDecoder[RabbitMessage]] |
filter | TYPE: Filter[RabbitMessage] |
middlewares | Optional sequence of middlewares for the call. TYPE: Optional[Sequence[Callable[[IncomingMessage], BaseMiddleware]]] |
Source code in faststream/rabbit/handler.py
| def add_call(
self,
*,
handler: HandlerCallWrapper[
aio_pika.IncomingMessage, P_HandlerParams, T_HandlerReturn
],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
parser: Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]],
decoder: Optional[CustomDecoder[RabbitMessage]],
filter: Filter[RabbitMessage],
middlewares: Optional[
Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]]
],
) -> None:
"""Add a call to the handler.
Args:
handler: The handler for the call.
dependant: The dependant for the call.
parser: Optional custom parser for the call.
decoder: Optional custom decoder for the call.
filter: The filter for the call.
middlewares: Optional sequence of middlewares for the call.
Returns:
None
"""
super().add_call(
handler=handler,
parser=resolve_custom_func(parser, AioPikaParser.parse_message),
decoder=resolve_custom_func(decoder, AioPikaParser.decode_message),
filter=filter, # type: ignore[arg-type]
dependant=dependant,
middlewares=middlewares,
)
|
close async
Source code in faststream/rabbit/handler.py
| async def close(self) -> None:
await super().close()
if self._queue_obj is not None:
if self._consumer_tag is not None: # pragma: no branch
await self._queue_obj.cancel(self._consumer_tag)
self._consumer_tag = None
self._queue_obj = None
|
consume async
consume(msg: MsgType) -> SendableMessage
Consume a message asynchronously.
PARAMETER | DESCRIPTION |
msg | The message to be consumed. TYPE: MsgType |
RETURNS | DESCRIPTION |
SendableMessage | |
RAISES | DESCRIPTION |
StopConsume | If the consumption needs to be stopped. |
RAISES | DESCRIPTION |
Exception | If an error occurs during consumption. |
Source code in faststream/broker/handler.py
| @override
async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[override]
"""Consume a message asynchronously.
Args:
msg: The message to be consumed.
Returns:
The sendable message.
Raises:
StopConsume: If the consumption needs to be stopped.
Raises:
Exception: If an error occurs during consumption.
"""
result: Optional[WrappedReturn[SendableMessage]] = None
result_msg: SendableMessage = None
if not self.running:
return result_msg
log_context_tag: Optional["Token[Any]"] = None
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)
stack.enter_context(context.scope("handler_", self))
gl_middlewares: List[BaseMiddleware] = [
await stack.enter_async_context(m(msg)) for m in self.global_middlewares
]
logged = False
processed = False
for handler, filter_, parser, decoder, middlewares, _ in self.calls:
local_middlewares: List[BaseMiddleware] = [
await stack.enter_async_context(m(msg)) for m in middlewares
]
all_middlewares = gl_middlewares + local_middlewares
# TODO: add parser & decoder caches
message = await parser(msg)
if not logged: # pragma: no branch
log_context_tag = context.set_local(
"log_context",
self.log_context_builder(message),
)
message.decoded_body = await decoder(message)
message.processed = processed
if await filter_(message):
assert ( # nosec B101
not processed
), "You can't process a message with multiple consumers"
try:
async with AsyncExitStack() as consume_stack:
for m_consume in all_middlewares:
message.decoded_body = (
await consume_stack.enter_async_context(
m_consume.consume_scope(message.decoded_body)
)
)
result = await cast(
Awaitable[Optional[WrappedReturn[SendableMessage]]],
handler.call_wrapped(message),
)
if result is not None:
result_msg, pub_response = result
# TODO: suppress all publishing errors and raise them after all publishers will be tried
for publisher in (pub_response, *handler._publishers):
if publisher is not None:
async with AsyncExitStack() as pub_stack:
result_to_send = result_msg
for m_pub in all_middlewares:
result_to_send = (
await pub_stack.enter_async_context(
m_pub.publish_scope(result_to_send)
)
)
await publisher.publish(
message=result_to_send,
correlation_id=message.correlation_id,
)
except StopConsume:
await self.close()
handler.trigger()
except HandlerException as e: # pragma: no cover
handler.trigger()
raise e
except Exception as e:
handler.trigger(error=e)
raise e
else:
handler.trigger(result=result[0] if result else None)
message.processed = processed = True
if IS_OPTIMIZED: # pragma: no cover
break
assert not self.running or processed, "You have to consume message" # nosec B101
if log_context_tag is not None:
context.reset_local("log_context", log_context_tag)
return result_msg
|
get_payloads
Get the payloads of the handler.
Source code in faststream/broker/handler.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
"""Get the payloads of the handler."""
payloads: List[Tuple[AnyDict, str]] = []
for h, _, _, _, _, dep in self.calls:
body = parse_handler_params(
dep, prefix=f"{self._title or self.call_name}:Message"
)
payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))
return payloads
|
name
Returns the name of the API operation.
Source code in faststream/asyncapi/base.py
| @abstractproperty
def name(self) -> str:
"""Returns the name of the API operation."""
raise NotImplementedError()
|
schema
Returns the schema of the API operation as a dictionary of channel names and channel objects.
Source code in faststream/asyncapi/base.py
| def schema(self) -> Dict[str, Channel]: # pragma: no cover
"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""
return {}
|
start async
Starts the consumer for the RabbitMQ queue.
PARAMETER | DESCRIPTION |
declarer | RabbitDeclarer object used to declare the queue and exchange TYPE: RabbitDeclarer |
Source code in faststream/rabbit/handler.py
| @override
async def start(self, declarer: RabbitDeclarer) -> None: # type: ignore[override]
"""Starts the consumer for the RabbitMQ queue.
Args:
declarer: RabbitDeclarer object used to declare the queue and exchange
Returns:
None
"""
self._queue_obj = queue = await declarer.declare_queue(self.queue)
if self.exchange is not None:
exchange = await declarer.declare_exchange(self.exchange)
await queue.bind(
exchange,
routing_key=self.queue.routing,
arguments=self.queue.bind_arguments,
)
self._consumer_tag = await queue.consume(
# NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
self.consume, # type: ignore[arg-type]
arguments=self.consume_args,
)
await super().start()
|