Skip to content

AsyncAPIBatchSubscriber

faststream.kafka.subscriber.asyncapi.AsyncAPIBatchSubscriber #

AsyncAPIBatchSubscriber(*topics, batch_timeout_ms, max_records, group_id, listener, pattern, connection_args, partitions, is_manual, no_ack, no_reply, retry, broker_dependencies, broker_middlewares, title_, description_, include_in_schema)

Bases: BatchSubscriber, AsyncAPISubscriber[Tuple['ConsumerRecord', ...]]

Source code in faststream/kafka/subscriber/usecase.py
def __init__(
    self,
    *topics: str,
    batch_timeout_ms: int,
    max_records: Optional[int],
    # Kafka information
    group_id: Optional[str],
    listener: Optional["ConsumerRebalanceListener"],
    pattern: Optional[str],
    connection_args: "AnyDict",
    partitions: Iterable["TopicPartition"],
    is_manual: bool,
    # Subscriber args
    no_ack: bool,
    no_reply: bool,
    retry: bool,
    broker_dependencies: Iterable["Depends"],
    broker_middlewares: Iterable[
        "BrokerMiddleware[Sequence[Tuple[ConsumerRecord, ...]]]"
    ],
    # AsyncAPI args
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> None:
    self.batch_timeout_ms = batch_timeout_ms
    self.max_records = max_records

    if pattern:
        reg, pattern = compile_path(
            pattern,
            replace_symbol=".*",
            patch_regex=lambda x: x.replace(r"\*", ".*"),
        )

    else:
        reg = None

    parser = AioKafkaBatchParser(
        msg_class=KafkaAckableMessage if is_manual else KafkaMessage,
        regex=reg,
    )

    super().__init__(
        *topics,
        group_id=group_id,
        listener=listener,
        pattern=pattern,
        connection_args=connection_args,
        partitions=partitions,
        # subscriber args
        default_parser=parser.parse_message,
        default_decoder=parser.decode_message,
        # Propagated args
        no_ack=no_ack,
        no_reply=no_reply,
        retry=retry,
        broker_middlewares=broker_middlewares,
        broker_dependencies=broker_dependencies,
        # AsyncAPI args
        title_=title_,
        description_=description_,
        include_in_schema=include_in_schema,
    )

title_ instance-attribute #

title_ = title_

description_ instance-attribute #

description_ = description_

include_in_schema instance-attribute #

include_in_schema = include_in_schema

name property #

name

Returns the name of the API operation.

description property #

description

Returns the description of the API operation.

calls instance-attribute #

calls = []

running instance-attribute #

running = False

call_name property #

call_name

Returns the name of the handler call.

lock instance-attribute #

extra_watcher_options instance-attribute #

extra_watcher_options = {}

extra_context instance-attribute #

extra_context = {}

graceful_timeout instance-attribute #

graceful_timeout = None

topics instance-attribute #

topics = topics

group_id instance-attribute #

group_id = group_id

builder instance-attribute #

builder = None

consumer instance-attribute #

consumer = None

task instance-attribute #

task = None

client_id instance-attribute #

client_id = ''

batch instance-attribute #

batch

partitions instance-attribute #

partitions = partitions

topic_names property #

topic_names

batch_timeout_ms instance-attribute #

batch_timeout_ms = batch_timeout_ms

max_records instance-attribute #

max_records = max_records

setup #

setup(*, client_id, builder, logger, producer, graceful_timeout, extra_context, broker_parser, broker_decoder, apply_types, is_validate, _get_dependant, _call_decorators)
Source code in faststream/kafka/subscriber/usecase.py
@override
def setup(  # type: ignore[override]
    self,
    *,
    client_id: Optional[str],
    builder: Callable[..., "AIOKafkaConsumer"],
    # basic args
    logger: Optional["LoggerProto"],
    producer: Optional["ProducerProto"],
    graceful_timeout: Optional[float],
    extra_context: "AnyDict",
    # broker options
    broker_parser: Optional["CustomCallable"],
    broker_decoder: Optional["CustomCallable"],
    # dependant args
    apply_types: bool,
    is_validate: bool,
    _get_dependant: Optional[Callable[..., Any]],
    _call_decorators: Iterable["Decorator"],
) -> None:
    self.client_id = client_id
    self.builder = builder

    super().setup(
        logger=logger,
        producer=producer,
        graceful_timeout=graceful_timeout,
        extra_context=extra_context,
        broker_parser=broker_parser,
        broker_decoder=broker_decoder,
        apply_types=apply_types,
        is_validate=is_validate,
        _get_dependant=_get_dependant,
        _call_decorators=_call_decorators,
    )

add_prefix #

add_prefix(prefix)
Source code in faststream/kafka/subscriber/usecase.py
def add_prefix(self, prefix: str) -> None:
    self.topics = tuple("".join((prefix, t)) for t in self.topics)

    self.partitions = [
        TopicPartition(
            topic="".join((prefix, p.topic)),
            partition=p.partition,
        )
        for p in self.partitions
    ]

schema #

schema()

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/abc.py
def schema(self) -> Dict[str, Channel]:
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    if self.include_in_schema:
        return self.get_schema()
    else:
        return {}

add_middleware #

add_middleware(middleware)
Source code in faststream/broker/subscriber/usecase.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    self._broker_middlewares = (*self._broker_middlewares, middleware)

get_log_context #

get_log_context(message)
Source code in faststream/kafka/subscriber/usecase.py
def get_log_context(
    self,
    message: Optional["StreamMessage[Tuple[ConsumerRecord, ...]]"],
) -> Dict[str, str]:
    if message is None:
        topic = ",".join(self.topic_names)
    else:
        topic = message.raw_message[0].topic

    return self.build_log_context(
        message=message,
        topic=topic,
        group_id=self.group_id,
    )

start async #

start()

Start the consumer.

Source code in faststream/kafka/subscriber/usecase.py
async def start(self) -> None:
    """Start the consumer."""
    assert self.builder, "You should setup subscriber at first."  # nosec B101

    self.consumer = consumer = self.builder(
        group_id=self.group_id,
        client_id=self.client_id,
        **self.__connection_args,
    )

    if self.topics or self._pattern:
        consumer.subscribe(
            topics=self.topics,
            pattern=self._pattern,
            listener=self.__listener,
        )

    elif self.partitions:
        consumer.assign(partitions=self.partitions)

    await consumer.start()
    await super().start()

    if self.calls:
        self.task = asyncio.create_task(self._consume())

close async #

close()
Source code in faststream/kafka/subscriber/usecase.py
async def close(self) -> None:
    await super().close()

    if self.consumer is not None:
        await self.consumer.stop()
        self.consumer = None

    if self.task is not None and not self.task.done():
        self.task.cancel()

    self.task = None

consume async #

consume(msg)

Consume a message asynchronously.

Source code in faststream/broker/subscriber/usecase.py
async def consume(self, msg: MsgType) -> Any:
    """Consume a message asynchronously."""
    if not self.running:
        return None

    try:
        return await self.process_message(msg)

    except StopConsume:
        # Stop handler at StopConsume exception
        await self.close()

    except SystemExit:
        # Stop handler at `exit()` call
        await self.close()

        if app := context.get("app"):
            app.exit()

    except Exception:  # nosec B110
        # All other exceptions were logged by CriticalLogMiddleware
        pass

process_message async #

process_message(msg)

Execute all message processing stages.

Source code in faststream/broker/subscriber/usecase.py
async def process_message(self, msg: MsgType) -> "Response":
    """Execute all message processing stages."""
    async with AsyncExitStack() as stack:
        stack.enter_context(self.lock)

        # Enter context before middlewares
        for k, v in self.extra_context.items():
            stack.enter_context(context.scope(k, v))

        stack.enter_context(context.scope("handler_", self))

        # enter all middlewares
        middlewares: List[BaseMiddleware] = []
        for base_m in self._broker_middlewares:
            middleware = base_m(msg)
            middlewares.append(middleware)
            await middleware.__aenter__()

        cache: Dict[Any, Any] = {}
        parsing_error: Optional[Exception] = None
        for h in self.calls:
            try:
                message = await h.is_suitable(msg, cache)
            except Exception as e:
                parsing_error = e
                break

            if message is not None:
                # Acknowledgement scope
                # TODO: move it to scope enter at `retry` option deprecation
                await stack.enter_async_context(
                    self.watcher(
                        message,
                        **self.extra_watcher_options,
                    )
                )

                stack.enter_context(
                    context.scope("log_context", self.get_log_context(message))
                )
                stack.enter_context(context.scope("message", message))

                # Middlewares should be exited before scope release
                for m in middlewares:
                    stack.push_async_exit(m.__aexit__)

                result_msg = ensure_response(
                    await h.call(
                        message=message,
                        # consumer middlewares
                        _extra_middlewares=(m.consume_scope for m in middlewares),
                    )
                )

                if not result_msg.correlation_id:
                    result_msg.correlation_id = message.correlation_id

                for p in chain(
                    self.__get_response_publisher(message),
                    h.handler._publishers,
                ):
                    await p.publish(
                        result_msg.body,
                        **result_msg.as_publish_kwargs(),
                        # publisher middlewares
                        _extra_middlewares=(m.publish_scope for m in middlewares),
                    )

                # Return data for tests
                return result_msg

        # Suitable handler was not found or
        # parsing/decoding exception occurred
        for m in middlewares:
            stack.push_async_exit(m.__aexit__)

        if parsing_error:
            raise parsing_error

        else:
            raise SubscriberNotFound(f"There is no suitable handler for {msg=}")

    # An error was raised and processed by some middleware
    return ensure_response(None)

get_one async #

get_one(*, timeout=5.0)
Source code in faststream/kafka/subscriber/usecase.py
@override
async def get_one(
    self,
    *,
    timeout: float = 5.0,
) -> "Optional[StreamMessage[MsgType]]":
    assert self.consumer, "You should start subscriber at first."  # nosec B101
    assert (  # nosec B101
        not self.calls
    ), "You can't use `get_one` method if subscriber has registered handlers."

    raw_messages = await self.consumer.getmany(
        timeout_ms=timeout * 1000, max_records=1
    )

    if not raw_messages:
        return None

    ((raw_message,),) = raw_messages.values()

    msg: StreamMessage[MsgType] = await process_msg(
        msg=raw_message,
        middlewares=self._broker_middlewares,
        parser=self._parser,
        decoder=self._decoder,
    )
    return msg

add_call #

add_call(*, filter_, parser_, decoder_, middlewares_, dependencies_)
Source code in faststream/broker/subscriber/usecase.py
def add_call(
    self,
    *,
    filter_: "Filter[Any]",
    parser_: Optional["CustomCallable"],
    decoder_: Optional["CustomCallable"],
    middlewares_: Iterable["SubscriberMiddleware[Any]"],
    dependencies_: Iterable["Depends"],
) -> Self:
    self._call_options = _CallOptions(
        filter=filter_,
        parser=parser_,
        decoder=decoder_,
        middlewares=middlewares_,
        dependencies=dependencies_,
    )
    return self

get_name #

get_name()
Source code in faststream/kafka/subscriber/asyncapi.py
def get_name(self) -> str:
    return f'{",".join(self.topics)}:{self.call_name}'

get_description #

get_description()

Returns the description of the handler.

Source code in faststream/broker/subscriber/usecase.py
def get_description(self) -> Optional[str]:
    """Returns the description of the handler."""
    if not self.calls:  # pragma: no cover
        return None

    else:
        return self.calls[0].description

get_schema #

get_schema()
Source code in faststream/kafka/subscriber/asyncapi.py
def get_schema(self) -> Dict[str, Channel]:
    channels = {}

    payloads = self.get_payloads()

    for t in self.topics:
        handler_name = self.title_ or f"{t}:{self.call_name}"

        channels[handler_name] = Channel(
            description=self.description,
            subscribe=Operation(
                message=Message(
                    title=f"{handler_name}:Message",
                    payload=resolve_payloads(payloads),
                    correlationId=CorrelationId(
                        location="$message.header#/correlation_id"
                    ),
                ),
            ),
            bindings=ChannelBinding(
                kafka=kafka.ChannelBinding(topic=t),
            ),
        )

    return channels

get_payloads #

get_payloads()

Get the payloads of the handler.

Source code in faststream/broker/subscriber/usecase.py
def get_payloads(self) -> List[Tuple["AnyDict", str]]:
    """Get the payloads of the handler."""
    payloads: List[Tuple[AnyDict, str]] = []

    for h in self.calls:
        if h.dependant is None:
            raise SetupError("You should setup `Handler` at first.")

        body = parse_handler_params(
            h.dependant,
            prefix=f"{self.title_ or self.call_name}:Message",
        )

        payloads.append((body, to_camelcase(h.call_name)))

    if not self.calls:
        payloads.append(
            (
                {
                    "title": f"{self.title_ or self.call_name}:Message:Payload",
                },
                to_camelcase(self.call_name),
            )
        )

    return payloads

get_msg async #

get_msg()
Source code in faststream/kafka/subscriber/usecase.py
async def get_msg(self) -> Tuple["ConsumerRecord", ...]:
    assert self.consumer, "You should setup subscriber at first."  # nosec B101

    messages = await self.consumer.getmany(
        timeout_ms=self.batch_timeout_ms,
        max_records=self.max_records,
    )

    if not messages:  # pragma: no cover
        await anyio.sleep(self.batch_timeout_ms / 1000)
        return ()

    return tuple(chain(*messages.values()))

get_routing_hash staticmethod #

get_routing_hash(topics, group_id=None)
Source code in faststream/kafka/subscriber/usecase.py
@staticmethod
def get_routing_hash(
    topics: Iterable[str],
    group_id: Optional[str] = None,
) -> int:
    return hash("".join((*topics, group_id or "")))

build_log_context staticmethod #

build_log_context(message, topic, group_id=None)
Source code in faststream/kafka/subscriber/usecase.py
@staticmethod
def build_log_context(
    message: Optional["StreamMessage[Any]"],
    topic: str,
    group_id: Optional[str] = None,
) -> Dict[str, str]:
    return {
        "topic": topic,
        "group_id": group_id or "",
        "message_id": getattr(message, "message_id", ""),
    }