LogicHandler(
queue: RabbitQueue,
log_context_builder: Callable[
[StreamMessage[Any]], Dict[str, str]
],
exchange: Optional[RabbitExchange] = None,
consume_args: Optional[AnyDict] = None,
description: Optional[str] = None,
title: Optional[str] = None,
virtual_host: str = "/",
)
Bases: AsyncHandler[IncomingMessage]
, BaseRMQInformation
A class to handle logic for RabbitMQ message consumption.
METHOD | DESCRIPTION |
__init__ | Initializes the LogicHandler object |
add_call | Adds a call to be handled by the LogicHandler |
start | Starts consuming messages from the queue |
close | Closes the consumer and cancels message consumption |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Initialize a RabbitMQ consumer.
PARAMETER | DESCRIPTION |
queue | RabbitQueue object representing the queue to consume from TYPE: RabbitQueue |
exchange | RabbitExchange object representing the exchange to bind the queue to (optional) TYPE: Optional[RabbitExchange] DEFAULT: None |
consume_args | Additional arguments for consuming from the queue (optional) TYPE: Optional[AnyDict] DEFAULT: None |
description | Description of the consumer (optional) TYPE: Optional[str] DEFAULT: None |
title | Title of the consumer (optional) TYPE: Optional[str] DEFAULT: None |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/rabbit/handler.py
| def __init__(
self,
queue: RabbitQueue,
log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
# RMQ information
exchange: Optional[RabbitExchange] = None,
consume_args: Optional[AnyDict] = None,
# AsyncAPI information
description: Optional[str] = None,
title: Optional[str] = None,
virtual_host: str = "/",
):
"""Initialize a RabbitMQ consumer.
Args:
queue: RabbitQueue object representing the queue to consume from
exchange: RabbitExchange object representing the exchange to bind the queue to (optional)
consume_args: Additional arguments for consuming from the queue (optional)
description: Description of the consumer (optional)
title: Title of the consumer (optional)
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
super().__init__(
log_context_builder=log_context_builder,
description=description,
title=title,
)
self.queue = queue
self.exchange = exchange
self.virtual_host = virtual_host
self.consume_args = consume_args or {}
self._consumer_tag = None
self._queue_obj = None
|
calls instance-attribute
calls: List[
Tuple[
HandlerCallWrapper[MsgType, Any, SendableMessage],
Callable[[StreamMessage[MsgType]], Awaitable[bool]],
AsyncParser[MsgType, Any],
AsyncDecoder[StreamMessage[MsgType]],
Sequence[Callable[[Any], BaseMiddleware]],
CallModel[Any, SendableMessage],
]
]
consume_args instance-attribute
consume_args: AnyDict = consume_args or {}
description property
description: Optional[str]
exchange instance-attribute
exchange: Optional[RabbitExchange] = exchange
global_middlewares instance-attribute
global_middlewares: Sequence[
Callable[[Any], BaseMiddleware]
] = []
log_context_builder instance-attribute
log_context_builder = log_context_builder
queue instance-attribute
queue: RabbitQueue = queue
virtual_host instance-attribute
virtual_host = virtual_host
add_call
add_call(
*,
handler: HandlerCallWrapper[
aio_pika.IncomingMessage,
P_HandlerParams,
T_HandlerReturn,
],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
parser: Optional[
CustomParser[
aio_pika.IncomingMessage, RabbitMessage
]
],
decoder: Optional[CustomDecoder[RabbitMessage]],
filter: Filter[RabbitMessage],
middlewares: Optional[
Sequence[
Callable[
[aio_pika.IncomingMessage], BaseMiddleware
]
]
]
) -> None
Add a call to the handler.
PARAMETER | DESCRIPTION |
handler | The handler for the call. TYPE: HandlerCallWrapper[IncomingMessage, P_HandlerParams, T_HandlerReturn] |
dependant | The dependant for the call. TYPE: CallModel[P_HandlerParams, T_HandlerReturn] |
parser | Optional custom parser for the call. TYPE: Optional[CustomParser[IncomingMessage, RabbitMessage]] |
decoder | Optional custom decoder for the call. TYPE: Optional[CustomDecoder[RabbitMessage]] |
filter | TYPE: Filter[RabbitMessage] |
middlewares | Optional sequence of middlewares for the call. TYPE: Optional[Sequence[Callable[[IncomingMessage], BaseMiddleware]]] |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/rabbit/handler.py
| def add_call(
self,
*,
handler: HandlerCallWrapper[
aio_pika.IncomingMessage, P_HandlerParams, T_HandlerReturn
],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
parser: Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]],
decoder: Optional[CustomDecoder[RabbitMessage]],
filter: Filter[RabbitMessage],
middlewares: Optional[
Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]]
],
) -> None:
"""Add a call to the handler.
Args:
handler: The handler for the call.
dependant: The dependant for the call.
parser: Optional custom parser for the call.
decoder: Optional custom decoder for the call.
filter: The filter for the call.
middlewares: Optional sequence of middlewares for the call.
Returns:
None
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
super().add_call(
handler=handler,
parser=resolve_custom_func(parser, AioPikaParser.parse_message),
decoder=resolve_custom_func(decoder, AioPikaParser.decode_message),
filter=filter, # type: ignore[arg-type]
dependant=dependant,
middlewares=middlewares,
)
|
close async
Source code in faststream/rabbit/handler.py
| async def close(self) -> None:
if self._queue_obj is not None:
if self._consumer_tag is not None: # pragma: no branch
await self._queue_obj.cancel(self._consumer_tag)
self._consumer_tag = None
self._queue_obj = None
|
consume async
consume(msg: MsgType) -> SendableMessage
Consume a message asynchronously.
PARAMETER | DESCRIPTION |
msg | The message to be consumed. TYPE: MsgType |
RETURNS | DESCRIPTION |
SendableMessage | |
RAISES | DESCRIPTION |
StopConsume | If the consumption needs to be stopped. |
RAISES | DESCRIPTION |
Exception | If an error occurs during consumption. |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/broker/handler.py
| @override
async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[override]
"""Consume a message asynchronously.
Args:
msg: The message to be consumed.
Returns:
The sendable message.
Raises:
StopConsume: If the consumption needs to be stopped.
Raises:
Exception: If an error occurs during consumption.
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
result: Optional[WrappedReturn[SendableMessage]] = None
result_msg: SendableMessage = None
async with AsyncExitStack() as stack:
gl_middlewares: List[BaseMiddleware] = []
stack.enter_context(context.scope("handler_", self))
for m in self.global_middlewares:
gl_middlewares.append(await stack.enter_async_context(m(msg)))
logged = False
processed = False
for handler, filter_, parser, decoder, middlewares, _ in self.calls:
local_middlewares: List[BaseMiddleware] = []
for local_m in middlewares:
local_middlewares.append(
await stack.enter_async_context(local_m(msg))
)
all_middlewares = gl_middlewares + local_middlewares
# TODO: add parser & decoder caches
message = await parser(msg)
if not logged: # pragma: no branch
log_context_tag = context.set_local(
"log_context", self.log_context_builder(message)
)
message.decoded_body = await decoder(message)
message.processed = processed
if await filter_(message):
assert ( # nosec B101
not processed
), "You can't proccess a message with multiple consumers"
try:
async with AsyncExitStack() as consume_stack:
for m_consume in all_middlewares:
message.decoded_body = (
await consume_stack.enter_async_context(
m_consume.consume_scope(message.decoded_body)
)
)
result = await cast(
Awaitable[Optional[WrappedReturn[SendableMessage]]],
handler.call_wrapped(message),
)
if result is not None:
result_msg, pub_response = result
# TODO: suppress all publishing errors and raise them after all publishers will be tried
for publisher in (pub_response, *handler._publishers):
if publisher is not None:
async with AsyncExitStack() as pub_stack:
result_to_send = result_msg
for m_pub in all_middlewares:
result_to_send = (
await pub_stack.enter_async_context(
m_pub.publish_scope(result_to_send)
)
)
await publisher.publish(
message=result_to_send,
correlation_id=message.correlation_id,
)
except StopConsume:
await self.close()
handler.trigger()
except HandlerException as e: # pragma: no cover
handler.trigger()
raise e
except Exception as e:
handler.trigger(error=e)
raise e
else:
handler.trigger(result=result[0] if result else None)
message.processed = processed = True
if IS_OPTIMIZED: # pragma: no cover
break
assert processed, "You have to consume message" # nosec B101
context.reset_local("log_context", log_context_tag)
return result_msg
|
get_payloads
get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/handler.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
payloads: List[Tuple[AnyDict, str]] = []
for h, _, _, _, _, dep in self.calls:
body = parse_handler_params(
dep, prefix=f"{self._title or self.call_name}:Message"
)
payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))
return payloads
|
name
Source code in faststream/asyncapi/base.py
| @abstractproperty
def name(self) -> str:
raise NotImplementedError()
|
schema
schema() -> Dict[str, Channel]
Source code in faststream/asyncapi/base.py
| def schema(self) -> Dict[str, Channel]: # pragma: no cover
return {}
|
start async
start(declarer: RabbitDeclarer) -> None
Starts the consumer for the RabbitMQ queue.
PARAMETER | DESCRIPTION |
declarer | RabbitDeclarer object used to declare the queue and exchange TYPE: RabbitDeclarer |
Note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
Source code in faststream/rabbit/handler.py
| @override
async def start(self, declarer: RabbitDeclarer) -> None: # type: ignore[override]
"""Starts the consumer for the RabbitMQ queue.
Args:
declarer: RabbitDeclarer object used to declare the queue and exchange
Returns:
None
!!! note
The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
"""
self._queue_obj = queue = await declarer.declare_queue(self.queue)
if self.exchange is not None:
exchange = await declarer.declare_exchange(self.exchange)
await queue.bind(
exchange,
routing_key=self.queue.routing,
arguments=self.queue.bind_arguments,
)
self._consumer_tag = await queue.consume(
# NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
self.consume, # type: ignore[arg-type]
arguments=self.consume_args,
)
|