Skip to content

Handler

faststream.rabbit.asyncapi.Handler #

Handler(
    queue: RabbitQueue,
    log_context_builder: Callable[
        [StreamMessage[Any]], Dict[str, str]
    ],
    exchange: Optional[RabbitExchange] = None,
    consume_args: Optional[AnyDict] = None,
    description: Optional[str] = None,
    title: Optional[str] = None,
    virtual_host: str = "/",
)

Bases: LogicHandler

A class that serves as a handler for RMQAsyncAPIChannel and LogicHandler.

METHOD DESCRIPTION
- name

Returns the name of the handler.

- get_payloads

Returns a list of payloads.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Initialize a RabbitMQ consumer.

PARAMETER DESCRIPTION
queue

RabbitQueue object representing the queue to consume from

TYPE: RabbitQueue

exchange

RabbitExchange object representing the exchange to bind the queue to (optional)

TYPE: Optional[RabbitExchange] DEFAULT: None

consume_args

Additional arguments for consuming from the queue (optional)

TYPE: Optional[AnyDict] DEFAULT: None

description

Description of the consumer (optional)

TYPE: Optional[str] DEFAULT: None

title

Title of the consumer (optional)

TYPE: Optional[str] DEFAULT: None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/rabbit/handler.py
def __init__(
    self,
    queue: RabbitQueue,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    # RMQ information
    exchange: Optional[RabbitExchange] = None,
    consume_args: Optional[AnyDict] = None,
    # AsyncAPI information
    description: Optional[str] = None,
    title: Optional[str] = None,
    virtual_host: str = "/",
):
    """Initialize a RabbitMQ consumer.

    Args:
        queue: RabbitQueue object representing the queue to consume from
        exchange: RabbitExchange object representing the exchange to bind the queue to (optional)
        consume_args: Additional arguments for consuming from the queue (optional)
        description: Description of the consumer (optional)
        title: Title of the consumer (optional)
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    super().__init__(
        log_context_builder=log_context_builder,
        description=description,
        title=title,
    )

    self.queue = queue
    self.exchange = exchange
    self.virtual_host = virtual_host
    self.consume_args = consume_args or {}

    self._consumer_tag = None
    self._queue_obj = None

call_name property #

call_name: str

calls instance-attribute #

calls: List[
    Tuple[
        HandlerCallWrapper[MsgType, Any, SendableMessage],
        Callable[[StreamMessage[MsgType]], Awaitable[bool]],
        AsyncParser[MsgType, Any],
        AsyncDecoder[StreamMessage[MsgType]],
        Sequence[Callable[[Any], BaseMiddleware]],
        CallModel[Any, SendableMessage],
    ]
]

consume_args instance-attribute #

consume_args: AnyDict = consume_args or {}

description property #

description: Optional[str]

exchange instance-attribute #

exchange: Optional[RabbitExchange] = exchange

global_middlewares instance-attribute #

global_middlewares: Sequence[
    Callable[[Any], BaseMiddleware]
] = []

log_context_builder instance-attribute #

log_context_builder = log_context_builder

queue instance-attribute #

queue: RabbitQueue = queue

virtual_host instance-attribute #

virtual_host = virtual_host

add_call #

add_call(
    *,
    handler: HandlerCallWrapper[
        aio_pika.IncomingMessage,
        P_HandlerParams,
        T_HandlerReturn,
    ],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: Optional[
        CustomParser[
            aio_pika.IncomingMessage, RabbitMessage
        ]
    ],
    decoder: Optional[CustomDecoder[RabbitMessage]],
    filter: Filter[RabbitMessage],
    middlewares: Optional[
        Sequence[
            Callable[
                [aio_pika.IncomingMessage], BaseMiddleware
            ]
        ]
    ]
) -> None

Add a call to the handler.

PARAMETER DESCRIPTION
handler

The handler for the call.

TYPE: HandlerCallWrapper[IncomingMessage, P_HandlerParams, T_HandlerReturn]

dependant

The dependant for the call.

TYPE: CallModel[P_HandlerParams, T_HandlerReturn]

parser

Optional custom parser for the call.

TYPE: Optional[CustomParser[IncomingMessage, RabbitMessage]]

decoder

Optional custom decoder for the call.

TYPE: Optional[CustomDecoder[RabbitMessage]]

filter

The filter for the call.

TYPE: Filter[RabbitMessage]

middlewares

Optional sequence of middlewares for the call.

TYPE: Optional[Sequence[Callable[[IncomingMessage], BaseMiddleware]]]

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/rabbit/handler.py
def add_call(
    self,
    *,
    handler: HandlerCallWrapper[
        aio_pika.IncomingMessage, P_HandlerParams, T_HandlerReturn
    ],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: Optional[CustomParser[aio_pika.IncomingMessage, RabbitMessage]],
    decoder: Optional[CustomDecoder[RabbitMessage]],
    filter: Filter[RabbitMessage],
    middlewares: Optional[
        Sequence[Callable[[aio_pika.IncomingMessage], BaseMiddleware]]
    ],
) -> None:
    """Add a call to the handler.

    Args:
        handler: The handler for the call.
        dependant: The dependant for the call.
        parser: Optional custom parser for the call.
        decoder: Optional custom decoder for the call.
        filter: The filter for the call.
        middlewares: Optional sequence of middlewares for the call.

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    super().add_call(
        handler=handler,
        parser=resolve_custom_func(parser, AioPikaParser.parse_message),
        decoder=resolve_custom_func(decoder, AioPikaParser.decode_message),
        filter=filter,  # type: ignore[arg-type]
        dependant=dependant,
        middlewares=middlewares,
    )

close async #

close() -> None
Source code in faststream/rabbit/handler.py
async def close(self) -> None:
    if self._queue_obj is not None:
        if self._consumer_tag is not None:  # pragma: no branch
            await self._queue_obj.cancel(self._consumer_tag)
            self._consumer_tag = None
        self._queue_obj = None

consume async #

consume(msg: MsgType) -> SendableMessage

Consume a message asynchronously.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
StopConsume

If the consumption needs to be stopped.

RAISES DESCRIPTION
Exception

If an error occurs during consumption.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/handler.py
@override
async def consume(self, msg: MsgType) -> SendableMessage:  # type: ignore[override]
    """Consume a message asynchronously.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        StopConsume: If the consumption needs to be stopped.

    Raises:
        Exception: If an error occurs during consumption.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    result: Optional[WrappedReturn[SendableMessage]] = None
    result_msg: SendableMessage = None

    async with AsyncExitStack() as stack:
        gl_middlewares: List[BaseMiddleware] = []

        stack.enter_context(context.scope("handler_", self))

        for m in self.global_middlewares:
            gl_middlewares.append(await stack.enter_async_context(m(msg)))

        logged = False
        processed = False
        for handler, filter_, parser, decoder, middlewares, _ in self.calls:
            local_middlewares: List[BaseMiddleware] = []
            for local_m in middlewares:
                local_middlewares.append(
                    await stack.enter_async_context(local_m(msg))
                )

            all_middlewares = gl_middlewares + local_middlewares

            # TODO: add parser & decoder caches
            message = await parser(msg)

            if not logged:  # pragma: no branch
                log_context_tag = context.set_local(
                    "log_context", self.log_context_builder(message)
                )

            message.decoded_body = await decoder(message)
            message.processed = processed

            if await filter_(message):
                assert (  # nosec B101
                    not processed
                ), "You can't proccess a message with multiple consumers"

                try:
                    async with AsyncExitStack() as consume_stack:
                        for m_consume in all_middlewares:
                            message.decoded_body = (
                                await consume_stack.enter_async_context(
                                    m_consume.consume_scope(message.decoded_body)
                                )
                            )

                        result = await cast(
                            Awaitable[Optional[WrappedReturn[SendableMessage]]],
                            handler.call_wrapped(message),
                        )

                    if result is not None:
                        result_msg, pub_response = result

                        # TODO: suppress all publishing errors and raise them after all publishers will be tried
                        for publisher in (pub_response, *handler._publishers):
                            if publisher is not None:
                                async with AsyncExitStack() as pub_stack:
                                    result_to_send = result_msg

                                    for m_pub in all_middlewares:
                                        result_to_send = (
                                            await pub_stack.enter_async_context(
                                                m_pub.publish_scope(result_to_send)
                                            )
                                        )

                                    await publisher.publish(
                                        message=result_to_send,
                                        correlation_id=message.correlation_id,
                                    )

                except StopConsume:
                    await self.close()
                    handler.trigger()

                except HandlerException as e:  # pragma: no cover
                    handler.trigger()
                    raise e

                except Exception as e:
                    handler.trigger(error=e)
                    raise e

                else:
                    handler.trigger(result=result[0] if result else None)
                    message.processed = processed = True
                    if IS_OPTIMIZED:  # pragma: no cover
                        break

        assert processed, "You have to consume message"  # nosec B101

    context.reset_local("log_context", log_context_tag)
    return result_msg

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep, prefix=f"{self._title or self.call_name}:Message"
        )
        payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))

    return payloads

name #

name() -> str
Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]
Source code in faststream/rabbit/asyncapi.py
def schema(self) -> Dict[str, Channel]:
    payloads = self.get_payloads()

    handler_name = (
        self._title
        or f"{self.queue.name}:{getattr(self.exchange, 'name', '_')}:{self.call_name}"
    )

    return {
        handler_name: Channel(
            description=self.description,  # type: ignore[attr-defined]
            subscribe=Operation(
                bindings=OperationBinding(
                    amqp=amqp.OperationBinding(
                        cc=self.queue.routing,
                    ),
                )
                if _is_exchange(self.exchange)
                else None,
                message=Message(
                    title=f"{handler_name}:Message",
                    payload=resolve_payloads(payloads),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                amqp=amqp.ChannelBinding(
                    **{
                        "is": "routingKey",  # type: ignore
                        "queue": amqp.Queue(
                            name=self.queue.name,
                            durable=self.queue.durable,
                            exclusive=self.queue.exclusive,
                            autoDelete=self.queue.auto_delete,
                            vhost=self.virtual_host,
                        )
                        if _is_exchange(self.exchange)
                        else None,
                        "exchange": (
                            amqp.Exchange(type="default", vhost=self.virtual_host)
                            if self.exchange is None
                            else amqp.Exchange(
                                type=self.exchange.type,  # type: ignore
                                name=self.exchange.name,
                                durable=self.exchange.durable,
                                autoDelete=self.exchange.auto_delete,
                                vhost=self.virtual_host,
                            )
                        ),
                    }
                )
            ),
        )
    }

start async #

start(declarer: RabbitDeclarer) -> None

Starts the consumer for the RabbitMQ queue.

PARAMETER DESCRIPTION
declarer

RabbitDeclarer object used to declare the queue and exchange

TYPE: RabbitDeclarer

RETURNS DESCRIPTION
None

None

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/rabbit/handler.py
@override
async def start(self, declarer: RabbitDeclarer) -> None:  # type: ignore[override]
    """Starts the consumer for the RabbitMQ queue.

    Args:
        declarer: RabbitDeclarer object used to declare the queue and exchange

    Returns:
        None
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    self._queue_obj = queue = await declarer.declare_queue(self.queue)

    if self.exchange is not None:
        exchange = await declarer.declare_exchange(self.exchange)
        await queue.bind(
            exchange,
            routing_key=self.queue.routing,
            arguments=self.queue.bind_arguments,
        )

    self._consumer_tag = await queue.consume(
        # NOTE: aio-pika expects AbstractIncomingMessage, not IncomingMessage
        self.consume,  # type: ignore[arg-type]
        arguments=self.consume_args,
    )

Last update: 2023-11-13