Skip to content

Handler

faststream.nats.asyncapi.Handler #

Handler(
    subject: str,
    log_context_builder: Callable[
        [StreamMessage[Any]], Dict[str, str]
    ],
    queue: str = "",
    stream: Optional[JStream] = None,
    pull_sub: Optional[PullSub] = None,
    extra_options: Optional[AnyDict] = None,
    description: Optional[str] = None,
    title: Optional[str] = None,
)

Bases: LogicNatsHandler

Source code in faststream/nats/handler.py
def __init__(
    self,
    subject: str,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    queue: str = "",
    stream: Optional[JStream] = None,
    pull_sub: Optional[PullSub] = None,
    extra_options: Optional[AnyDict] = None,
    # AsyncAPI information
    description: Optional[str] = None,
    title: Optional[str] = None,
):
    reg, path = compile_path(subject, replace_symbol="*")
    self.subject = path
    self.path_regex = reg

    self.queue = queue

    self.stream = stream
    self.pull_sub = pull_sub
    self.extra_options = extra_options or {}

    super().__init__(
        log_context_builder=log_context_builder,
        description=description,
        title=title,
    )

    self.task = None
    self.subscription = None

call_name property #

call_name: str

calls instance-attribute #

calls: List[
    Tuple[
        HandlerCallWrapper[MsgType, Any, SendableMessage],
        Callable[[StreamMessage[MsgType]], Awaitable[bool]],
        AsyncParser[MsgType, Any],
        AsyncDecoder[StreamMessage[MsgType]],
        Sequence[Callable[[Any], BaseMiddleware]],
        CallModel[Any, SendableMessage],
    ]
]

description property #

description: Optional[str]

extra_options instance-attribute #

extra_options = extra_options or {}

global_middlewares instance-attribute #

global_middlewares: Sequence[
    Callable[[Any], BaseMiddleware]
] = []

log_context_builder instance-attribute #

log_context_builder = log_context_builder

path_regex instance-attribute #

path_regex = reg

pull_sub instance-attribute #

pull_sub = pull_sub

queue instance-attribute #

queue = queue

stream instance-attribute #

stream = stream

subject instance-attribute #

subject = path

subscription instance-attribute #

subscription: Union[
    None,
    Subscription,
    JetStreamContext.PushSubscription,
    JetStreamContext.PullSubscription,
] = None

task class-attribute instance-attribute #

task: Optional[asyncio.Task[Any]] = None

add_call #

add_call(
    *,
    handler: HandlerCallWrapper[
        Msg, P_HandlerParams, T_HandlerReturn
    ],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: Optional[CustomParser[Msg, NatsMessage]],
    decoder: Optional[CustomDecoder[NatsMessage]],
    filter: Filter[NatsMessage],
    middlewares: Optional[
        Sequence[Callable[[Msg], BaseMiddleware]]
    ]
) -> None
Source code in faststream/nats/handler.py
def add_call(
    self,
    *,
    handler: HandlerCallWrapper[Msg, P_HandlerParams, T_HandlerReturn],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: Optional[CustomParser[Msg, NatsMessage]],
    decoder: Optional[CustomDecoder[NatsMessage]],
    filter: Filter[NatsMessage],
    middlewares: Optional[Sequence[Callable[[Msg], BaseMiddleware]]],
) -> None:
    parser_ = Parser if self.stream is None else JsParser
    super().add_call(
        handler=handler,
        parser=resolve_custom_func(parser, parser_.parse_message),
        decoder=resolve_custom_func(decoder, parser_.decode_message),
        filter=filter,  # type: ignore[arg-type]
        dependant=dependant,
        middlewares=middlewares,
    )

close async #

close() -> None
Source code in faststream/nats/handler.py
async def close(self) -> None:
    if self.subscription is not None:
        await self.subscription.unsubscribe()
        self.subscription = None

    if self.task is not None:
        self.task.cancel()
        self.task = None

consume async #

consume(msg: MsgType) -> SendableMessage

Consume a message asynchronously.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
StopConsume

If the consumption needs to be stopped.

RAISES DESCRIPTION
Exception

If an error occurs during consumption.

Note

The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)

Source code in faststream/broker/handler.py
@override
async def consume(self, msg: MsgType) -> SendableMessage:  # type: ignore[override]
    """Consume a message asynchronously.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        StopConsume: If the consumption needs to be stopped.

    Raises:
        Exception: If an error occurs during consumption.
    !!! note

        The above docstring is autogenerated by docstring-gen library (https://docstring-gen.airt.ai)
    """
    result: Optional[WrappedReturn[SendableMessage]] = None
    result_msg: SendableMessage = None

    async with AsyncExitStack() as stack:
        gl_middlewares: List[BaseMiddleware] = []

        stack.enter_context(context.scope("handler_", self))

        for m in self.global_middlewares:
            gl_middlewares.append(await stack.enter_async_context(m(msg)))

        logged = False
        processed = False
        for handler, filter_, parser, decoder, middlewares, _ in self.calls:
            local_middlewares: List[BaseMiddleware] = []
            for local_m in middlewares:
                local_middlewares.append(
                    await stack.enter_async_context(local_m(msg))
                )

            all_middlewares = gl_middlewares + local_middlewares

            # TODO: add parser & decoder caches
            message = await parser(msg)

            if not logged:  # pragma: no branch
                log_context_tag = context.set_local(
                    "log_context", self.log_context_builder(message)
                )

            message.decoded_body = await decoder(message)
            message.processed = processed

            if await filter_(message):
                assert (  # nosec B101
                    not processed
                ), "You can't proccess a message with multiple consumers"

                try:
                    async with AsyncExitStack() as consume_stack:
                        for m_consume in all_middlewares:
                            message.decoded_body = (
                                await consume_stack.enter_async_context(
                                    m_consume.consume_scope(message.decoded_body)
                                )
                            )

                        result = await cast(
                            Awaitable[Optional[WrappedReturn[SendableMessage]]],
                            handler.call_wrapped(message),
                        )

                    if result is not None:
                        result_msg, pub_response = result

                        # TODO: suppress all publishing errors and raise them after all publishers will be tried
                        for publisher in (pub_response, *handler._publishers):
                            if publisher is not None:
                                async with AsyncExitStack() as pub_stack:
                                    result_to_send = result_msg

                                    for m_pub in all_middlewares:
                                        result_to_send = (
                                            await pub_stack.enter_async_context(
                                                m_pub.publish_scope(result_to_send)
                                            )
                                        )

                                    await publisher.publish(
                                        message=result_to_send,
                                        correlation_id=message.correlation_id,
                                    )

                except StopConsume:
                    await self.close()
                    handler.trigger()

                except HandlerException as e:  # pragma: no cover
                    handler.trigger()
                    raise e

                except Exception as e:
                    handler.trigger(error=e)
                    raise e

                else:
                    handler.trigger(result=result[0] if result else None)
                    message.processed = processed = True
                    if IS_OPTIMIZED:  # pragma: no cover
                        break

        assert processed, "You have to consume message"  # nosec B101

    context.reset_local("log_context", log_context_tag)
    return result_msg

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]
Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep, prefix=f"{self._title or self.call_name}:Message"
        )
        payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))

    return payloads

get_routing_hash staticmethod #

get_routing_hash(subject: str) -> str
Source code in faststream/nats/handler.py
@staticmethod
def get_routing_hash(subject: str) -> str:
    return subject

name #

name() -> str
Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]
Source code in faststream/nats/asyncapi.py
def schema(self) -> Dict[str, Channel]:
    payloads = self.get_payloads()
    handler_name = self._title or f"{self.subject}:{self.call_name}"
    return {
        handler_name: Channel(
            description=self.description,
            subscribe=Operation(
                message=Message(
                    title=f"{handler_name}:Message",
                    payload=resolve_payloads(payloads),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                nats=nats.ChannelBinding(
                    subject=self.subject,
                    queue=self.queue or None,
                )
            ),
        )
    }

start async #

start(connection: Union[Client, JetStreamContext]) -> None
Source code in faststream/nats/handler.py
@override
async def start(self, connection: Union[Client, JetStreamContext]) -> None:  # type: ignore[override]
    if self.pull_sub is not None:
        connection = cast(JetStreamContext, connection)

        if self.stream is None:
            raise ValueError("Pull subscriber can be used only with a stream")

        self.subscription = await connection.pull_subscribe(
            subject=self.subject,
            **self.extra_options,
        )
        self.task = asyncio.create_task(self._consume())

    else:
        self.subscription = await connection.subscribe(
            subject=self.subject,
            queue=self.queue,
            cb=self.consume,  # type: ignore[arg-type]
            **self.extra_options,
        )

Last update: 2023-11-13