Skip to content

LogicHandler

faststream.kafka.handler.LogicHandler #

LogicHandler(
    *topics: str,
    log_context_builder: Callable[
        [StreamMessage[Any]], Dict[str, str]
    ],
    graceful_timeout: Optional[float] = None,
    group_id: Optional[str] = None,
    client_id: str = "faststream-" + __version__,
    builder: Callable[..., AIOKafkaConsumer],
    is_manual: bool = False,
    batch: bool = False,
    batch_timeout_ms: int = 200,
    max_records: Optional[int] = None,
    title: Optional[str] = None,
    description: Optional[str] = None,
    include_in_schema: bool = True
)

Bases: AsyncHandler[ConsumerRecord]

A class to handle logic for consuming messages from Kafka.

METHOD DESCRIPTION
__init__

constructor method for the LogicHandler class

start

method to start consuming messages from Kafka

close

method to close the Kafka consumer and cancel the consuming task

add_call

method to add a handler call for processing consumed messages

_consume

method to consume messages from Kafka and call the appropriate handler

Initialize a Kafka consumer for the specified topics.

PARAMETER DESCRIPTION
*topics

Variable length argument list of topics to consume from.

TYPE: str DEFAULT: ()

log_context_builder

Callable that builds the log context.

TYPE: Callable[[StreamMessage[Any]], Dict[str, str]]

graceful_timeout

Optional timeout in seconds for graceful shutdown.

TYPE: Optional[float] DEFAULT: None

group_id

Optional group ID for the consumer.

TYPE: Optional[str] DEFAULT: None

client_id

Client ID for the consumer.

TYPE: str DEFAULT: 'faststream-' + __version__

builder

Callable that constructs an AIOKafkaConsumer instance.

TYPE: Callable[..., AIOKafkaConsumer]

is_manual

Flag indicating whether to manually commit offsets.

TYPE: bool DEFAULT: False

batch

Flag indicating whether to consume messages in batches.

TYPE: bool DEFAULT: False

batch_timeout_ms

Timeout in milliseconds for batch consumption.

TYPE: int DEFAULT: 200

max_records

Maximum number of records to consume in a batch.

TYPE: Optional[int] DEFAULT: None

title

Optional title for the consumer.

TYPE: Optional[str] DEFAULT: None

description

Optional description for the consumer.

TYPE: Optional[str] DEFAULT: None

include_in_schema

Flag indicating whether to include the consumer in the API specification.

TYPE: bool DEFAULT: True

RAISES DESCRIPTION
NotImplementedError

If silent animals are not supported.

Source code in faststream/kafka/handler.py
@override
def __init__(
    self,
    *topics: str,
    log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
    graceful_timeout: Optional[float] = None,
    # Kafka information
    group_id: Optional[str] = None,
    client_id: str = "faststream-" + __version__,
    builder: Callable[..., AIOKafkaConsumer],
    is_manual: bool = False,
    batch: bool = False,
    batch_timeout_ms: int = 200,
    max_records: Optional[int] = None,
    # AsyncAPI information
    title: Optional[str] = None,
    description: Optional[str] = None,
    include_in_schema: bool = True,
) -> None:
    """Initialize a Kafka consumer for the specified topics.

    Args:
        *topics: Variable length argument list of topics to consume from.
        log_context_builder: Callable that builds the log context.
        graceful_timeout: Optional timeout in seconds for graceful shutdown.
        group_id: Optional group ID for the consumer.
        client_id: Client ID for the consumer.
        builder: Callable that constructs an AIOKafkaConsumer instance.
        is_manual: Flag indicating whether to manually commit offsets.
        batch: Flag indicating whether to consume messages in batches.
        batch_timeout_ms: Timeout in milliseconds for batch consumption.
        max_records: Maximum number of records to consume in a batch.
        title: Optional title for the consumer.
        description: Optional description for the consumer.
        include_in_schema: Flag indicating whether to include the consumer in the API specification.

    Raises:
        NotImplementedError: If silent animals are not supported.

    """
    super().__init__(
        log_context_builder=log_context_builder,
        description=description,
        title=title,
        include_in_schema=include_in_schema,
        graceful_timeout=graceful_timeout,
    )

    self.group_id = group_id
    self.client_id = client_id
    self.topics = topics

    self.batch = batch
    self.batch_timeout_ms = batch_timeout_ms
    self.max_records = max_records
    self.is_manual = is_manual

    self.builder = builder
    self.task = None
    self.consumer = None

batch class-attribute instance-attribute #

batch: bool = batch

batch_timeout_ms instance-attribute #

batch_timeout_ms = batch_timeout_ms

builder instance-attribute #

builder = builder

call_name property #

call_name: str

Returns the name of the handler call.

calls instance-attribute #

calls: List[
    Tuple[
        HandlerCallWrapper[MsgType, Any, SendableMessage],
        Callable[[StreamMessage[MsgType]], Awaitable[bool]],
        AsyncParser[MsgType, Any],
        AsyncDecoder[StreamMessage[MsgType]],
        Sequence[Callable[[Any], BaseMiddleware]],
        CallModel[Any, SendableMessage],
    ]
]

client_id instance-attribute #

client_id = client_id

consumer class-attribute instance-attribute #

consumer: Optional[AIOKafkaConsumer] = None

description property #

description: Optional[str]

Returns the description of the handler.

global_middlewares instance-attribute #

global_middlewares: Sequence[
    Callable[[Any], BaseMiddleware]
] = []

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

group_id class-attribute instance-attribute #

group_id: Optional[str] = group_id

include_in_schema instance-attribute #

include_in_schema = include_in_schema

is_manual instance-attribute #

is_manual = is_manual

lock instance-attribute #

lock = MultiLock()

log_context_builder instance-attribute #

log_context_builder = log_context_builder

max_records instance-attribute #

max_records = max_records

running instance-attribute #

running = False

task class-attribute instance-attribute #

task: Optional[Task[Any]] = None

topics instance-attribute #

topics: Sequence[str] = topics

add_call #

add_call(
    *,
    handler: HandlerCallWrapper[
        ConsumerRecord, P_HandlerParams, T_HandlerReturn
    ],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: CustomParser[
        Union[ConsumerRecord, Tuple[ConsumerRecord, ...]],
        KafkaMessage,
    ],
    decoder: Optional[CustomDecoder[KafkaMessage]],
    filter: Union[
        Filter[KafkaMessage],
        Filter[StreamMessage[Tuple[ConsumerRecord, ...]]],
    ],
    middlewares: Optional[
        Sequence[Callable[[ConsumerRecord], BaseMiddleware]]
    ]
) -> None

Adds a call to the handler.

PARAMETER DESCRIPTION
handler

The handler function to be called.

TYPE: HandlerCallWrapper[ConsumerRecord, P_HandlerParams, T_HandlerReturn]

dependant

The dependant model.

TYPE: CallModel[P_HandlerParams, T_HandlerReturn]

parser

Optional custom parser for parsing the input.

TYPE: CustomParser[Union[ConsumerRecord, Tuple[ConsumerRecord, ...]], KafkaMessage]

decoder

Optional custom decoder for decoding the input.

TYPE: Optional[CustomDecoder[KafkaMessage]]

filter

The filter for filtering the input.

TYPE: Union[Filter[KafkaMessage], Filter[StreamMessage[Tuple[ConsumerRecord, ...]]]]

middlewares

Optional sequence of middlewares to be applied.

TYPE: Optional[Sequence[Callable[[ConsumerRecord], BaseMiddleware]]]

RETURNS DESCRIPTION
None

None

Source code in faststream/kafka/handler.py
def add_call(
    self,
    *,
    handler: HandlerCallWrapper[ConsumerRecord, P_HandlerParams, T_HandlerReturn],
    dependant: CallModel[P_HandlerParams, T_HandlerReturn],
    parser: CustomParser[
        Union[ConsumerRecord, Tuple[ConsumerRecord, ...]], KafkaMessage
    ],
    decoder: Optional[CustomDecoder[KafkaMessage]],
    filter: Union[
        Filter[KafkaMessage],
        Filter[StreamMessage[Tuple[ConsumerRecord, ...]]],
    ],
    middlewares: Optional[Sequence[Callable[[ConsumerRecord], BaseMiddleware]]],
) -> None:
    """Adds a call to the handler.

    Args:
        handler: The handler function to be called.
        dependant: The dependant model.
        parser: Optional custom parser for parsing the input.
        decoder: Optional custom decoder for decoding the input.
        filter: The filter for filtering the input.
        middlewares: Optional sequence of middlewares to be applied.

    Returns:
        None

    """
    parser_ = resolve_custom_func(  # type: ignore[type-var]
        parser,  # type: ignore[arg-type]
        (
            AioKafkaParser.parse_message_batch  # type: ignore[arg-type]
            if self.batch
            else AioKafkaParser.parse_message
        ),
    )
    decoder_ = resolve_custom_func(
        decoder,  # type: ignore[arg-type]
        (
            AioKafkaParser.decode_message_batch  # type: ignore[arg-type]
            if self.batch
            else AioKafkaParser.decode_message
        ),
    )
    super().add_call(
        handler=handler,
        parser=parser_,
        decoder=decoder_,
        filter=filter,  # type: ignore[arg-type]
        dependant=dependant,
        middlewares=middlewares,
    )

close async #

close() -> None
Source code in faststream/kafka/handler.py
async def close(self) -> None:
    await super().close()

    if self.consumer is not None:
        await self.consumer.stop()
        self.consumer = None

    if self.task is not None:
        self.task.cancel()
        self.task = None

consume async #

consume(msg: MsgType) -> SendableMessage

Consume a message asynchronously.

PARAMETER DESCRIPTION
msg

The message to be consumed.

TYPE: MsgType

RETURNS DESCRIPTION
SendableMessage

The sendable message.

RAISES DESCRIPTION
StopConsume

If the consumption needs to be stopped.

RAISES DESCRIPTION
Exception

If an error occurs during consumption.

Source code in faststream/broker/handler.py
@override
async def consume(self, msg: MsgType) -> SendableMessage:  # type: ignore[override]
    """Consume a message asynchronously.

    Args:
        msg: The message to be consumed.

    Returns:
        The sendable message.

    Raises:
        StopConsume: If the consumption needs to be stopped.

    Raises:
        Exception: If an error occurs during consumption.

    """
    result: Optional[WrappedReturn[SendableMessage]] = None
    result_msg: SendableMessage = None

    if not self.running:
        return result_msg

    log_context_tag: Optional["Token[Any]"] = None
    async with AsyncExitStack() as stack:
        stack.enter_context(self.lock)

        stack.enter_context(context.scope("handler_", self))

        gl_middlewares: List[BaseMiddleware] = [
            await stack.enter_async_context(m(msg)) for m in self.global_middlewares
        ]

        logged = False
        processed = False
        for handler, filter_, parser, decoder, middlewares, _ in self.calls:
            local_middlewares: List[BaseMiddleware] = [
                await stack.enter_async_context(m(msg)) for m in middlewares
            ]

            all_middlewares = gl_middlewares + local_middlewares

            # TODO: add parser & decoder caches
            message = await parser(msg)

            if not logged:  # pragma: no branch
                log_context_tag = context.set_local(
                    "log_context",
                    self.log_context_builder(message),
                )

            message.decoded_body = await decoder(message)
            message.processed = processed

            if await filter_(message):
                assert (  # nosec B101
                    not processed
                ), "You can't process a message with multiple consumers"

                try:
                    async with AsyncExitStack() as consume_stack:
                        for m_consume in all_middlewares:
                            message.decoded_body = (
                                await consume_stack.enter_async_context(
                                    m_consume.consume_scope(message.decoded_body)
                                )
                            )

                        result = await cast(
                            Awaitable[Optional[WrappedReturn[SendableMessage]]],
                            handler.call_wrapped(message),
                        )

                    if result is not None:
                        result_msg, pub_response = result

                        # TODO: suppress all publishing errors and raise them after all publishers will be tried
                        for publisher in (pub_response, *handler._publishers):
                            if publisher is not None:
                                async with AsyncExitStack() as pub_stack:
                                    result_to_send = result_msg

                                    for m_pub in all_middlewares:
                                        result_to_send = (
                                            await pub_stack.enter_async_context(
                                                m_pub.publish_scope(result_to_send)
                                            )
                                        )

                                    await publisher.publish(
                                        message=result_to_send,
                                        correlation_id=message.correlation_id,
                                    )

                except StopConsume:
                    await self.close()
                    handler.trigger()

                except HandlerException as e:  # pragma: no cover
                    handler.trigger()
                    raise e

                except Exception as e:
                    handler.trigger(error=e)
                    raise e

                else:
                    handler.trigger(result=result[0] if result else None)
                    message.processed = processed = True
                    if IS_OPTIMIZED:  # pragma: no cover
                        break

        assert not self.running or processed, "You have to consume message"  # nosec B101

    if log_context_tag is not None:
        context.reset_local("log_context", log_context_tag)

    return result_msg

get_payloads #

get_payloads() -> List[Tuple[AnyDict, str]]

Get the payloads of the handler.

Source code in faststream/broker/handler.py
def get_payloads(self) -> List[Tuple[AnyDict, str]]:
    """Get the payloads of the handler."""
    payloads: List[Tuple[AnyDict, str]] = []

    for h, _, _, _, _, dep in self.calls:
        body = parse_handler_params(
            dep, prefix=f"{self._title or self.call_name}:Message"
        )
        payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))

    return payloads

get_routing_hash staticmethod #

get_routing_hash(
    topics: Sequence[str], group_id: Optional[str] = None
) -> str
Source code in faststream/kafka/handler.py
@staticmethod
def get_routing_hash(topics: Sequence[str], group_id: Optional[str] = None) -> str:
    return "".join((*topics, group_id or ""))

name #

name() -> str

Returns the name of the API operation.

Source code in faststream/asyncapi/base.py
@abstractproperty
def name(self) -> str:
    """Returns the name of the API operation."""
    raise NotImplementedError()

schema #

schema() -> Dict[str, Channel]

Returns the schema of the API operation as a dictionary of channel names and channel objects.

Source code in faststream/asyncapi/base.py
def schema(self) -> Dict[str, Channel]:  # pragma: no cover
    """Returns the schema of the API operation as a dictionary of channel names and channel objects."""
    return {}

start async #

start(
    **consumer_kwargs: Unpack[ConsumerConnectionParams],
) -> None

Start the consumer.

PARAMETER DESCRIPTION
**consumer_kwargs

Additional keyword arguments to pass to the consumer.

TYPE: Unpack[ConsumerConnectionParams] DEFAULT: {}

RETURNS DESCRIPTION
None

None

Source code in faststream/kafka/handler.py
@override
async def start(  # type: ignore[override]
    self,
    **consumer_kwargs: Unpack[ConsumerConnectionParams],
) -> None:
    """Start the consumer.

    Args:
        **consumer_kwargs: Additional keyword arguments to pass to the consumer.

    Returns:
        None

    """
    self.consumer = consumer = self.builder(
        *self.topics,
        group_id=self.group_id,
        client_id=self.client_id,
        **consumer_kwargs,
    )
    await consumer.start()
    self.task = asyncio.create_task(self._consume())
    await super().start()