Skip to content

create_subscriber

faststream.kafka.subscriber.factory.create_subscriber #

create_subscriber(*topics: str, batch: Literal[True], batch_timeout_ms: int, max_records: Optional[int], group_id: Optional[str], listener: Optional[ConsumerRebalanceListener], pattern: Optional[str], connection_args: AnyDict, partitions: Iterable[TopicPartition], is_manual: bool, max_workers: int, no_ack: bool, no_reply: bool, retry: bool, broker_dependencies: Iterable[Depends], broker_middlewares: Sequence[BrokerMiddleware[Tuple[ConsumerRecord, ...]]], title_: Optional[str], description_: Optional[str], include_in_schema: bool) -> AsyncAPIBatchSubscriber
create_subscriber(*topics: str, batch: Literal[False], batch_timeout_ms: int, max_records: Optional[int], group_id: Optional[str], listener: Optional[ConsumerRebalanceListener], pattern: Optional[str], connection_args: AnyDict, partitions: Iterable[TopicPartition], is_manual: bool, max_workers: int, no_ack: bool, no_reply: bool, retry: bool, broker_dependencies: Iterable[Depends], broker_middlewares: Sequence[BrokerMiddleware[ConsumerRecord]], title_: Optional[str], description_: Optional[str], include_in_schema: bool) -> AsyncAPIDefaultSubscriber
create_subscriber(*topics: str, batch: bool, batch_timeout_ms: int, max_records: Optional[int], group_id: Optional[str], listener: Optional[ConsumerRebalanceListener], pattern: Optional[str], connection_args: AnyDict, partitions: Iterable[TopicPartition], is_manual: bool, max_workers: int, no_ack: bool, no_reply: bool, retry: bool, broker_dependencies: Iterable[Depends], broker_middlewares: Sequence[BrokerMiddleware[Union[ConsumerRecord, Tuple[ConsumerRecord, ...]]]], title_: Optional[str], description_: Optional[str], include_in_schema: bool) -> Union[AsyncAPIDefaultSubscriber, AsyncAPIBatchSubscriber]
create_subscriber(*topics, batch, batch_timeout_ms, max_records, group_id, listener, pattern, connection_args, partitions, is_manual, max_workers, no_ack, no_reply, retry, broker_dependencies, broker_middlewares, title_, description_, include_in_schema)
Source code in faststream/kafka/subscriber/factory.py
def create_subscriber(
    *topics: str,
    batch: bool,
    batch_timeout_ms: int,
    max_records: Optional[int],
    # Kafka information
    group_id: Optional[str],
    listener: Optional["ConsumerRebalanceListener"],
    pattern: Optional[str],
    connection_args: "AnyDict",
    partitions: Iterable["TopicPartition"],
    is_manual: bool,
    # Subscriber args
    max_workers: int,
    no_ack: bool,
    no_reply: bool,
    retry: bool,
    broker_dependencies: Iterable["Depends"],
    broker_middlewares: Sequence[
        "BrokerMiddleware[Union[ConsumerRecord, Tuple[ConsumerRecord, ...]]]"
    ],
    # AsyncAPI args
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> Union[
    "AsyncAPIDefaultSubscriber",
    "AsyncAPIBatchSubscriber",
    "AsyncAPIConcurrentDefaultSubscriber",
]:
    if is_manual and not group_id:
        raise SetupError("You must use `group_id` with manual commit mode.")

    if is_manual and max_workers > 1:
        raise SetupError("Max workers not work with manual commit mode.")

    if not topics and not partitions and not pattern:
        raise SetupError(
            "You should provide either `topics` or `partitions` or `pattern`."
        )
    elif topics and partitions:
        raise SetupError("You can't provide both `topics` and `partitions`.")
    elif topics and pattern:
        raise SetupError("You can't provide both `topics` and `pattern`.")
    elif partitions and pattern:
        raise SetupError("You can't provide both `partitions` and `pattern`.")

    if batch:
        return AsyncAPIBatchSubscriber(
            *topics,
            batch_timeout_ms=batch_timeout_ms,
            max_records=max_records,
            group_id=group_id,
            listener=listener,
            pattern=pattern,
            connection_args=connection_args,
            partitions=partitions,
            is_manual=is_manual,
            no_ack=no_ack,
            no_reply=no_reply,
            retry=retry,
            broker_dependencies=broker_dependencies,
            broker_middlewares=broker_middlewares,
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        )

    else:
        if max_workers > 1:
            return AsyncAPIConcurrentDefaultSubscriber(
                *topics,
                max_workers=max_workers,
                group_id=group_id,
                listener=listener,
                pattern=pattern,
                connection_args=connection_args,
                partitions=partitions,
                is_manual=is_manual,
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=broker_middlewares,
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )
        else:
            return AsyncAPIDefaultSubscriber(
                *topics,
                group_id=group_id,
                listener=listener,
                pattern=pattern,
                connection_args=connection_args,
                partitions=partitions,
                is_manual=is_manual,
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=broker_middlewares,
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )