Skip to content

create_subscriber

faststream.confluent.subscriber.factory.create_subscriber #

create_subscriber(
    *topics: str,
    partitions: Sequence[TopicPartition],
    polling_interval: float,
    batch: Literal[True],
    max_records: Optional[int],
    group_id: Optional[str],
    connection_data: AnyDict,
    is_manual: bool,
    no_ack: bool,
    max_workers: int,
    no_reply: bool,
    retry: bool,
    broker_dependencies: Iterable[Depends],
    broker_middlewares: Sequence[
        BrokerMiddleware[Tuple[Message, ...]]
    ],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> AsyncAPIBatchSubscriber
create_subscriber(
    *topics: str,
    partitions: Sequence[TopicPartition],
    polling_interval: float,
    batch: Literal[False],
    max_records: Optional[int],
    group_id: Optional[str],
    connection_data: AnyDict,
    is_manual: bool,
    no_ack: bool,
    max_workers: int,
    no_reply: bool,
    retry: bool,
    broker_dependencies: Iterable[Depends],
    broker_middlewares: Sequence[BrokerMiddleware[Message]],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> Union[
    AsyncAPIDefaultSubscriber,
    AsyncAPIConcurrentDefaultSubscriber,
]
create_subscriber(
    *topics: str,
    partitions: Sequence[TopicPartition],
    polling_interval: float,
    batch: bool,
    max_records: Optional[int],
    group_id: Optional[str],
    connection_data: AnyDict,
    is_manual: bool,
    no_ack: bool,
    max_workers: int,
    no_reply: bool,
    retry: bool,
    broker_dependencies: Iterable[Depends],
    broker_middlewares: Union[
        Sequence[BrokerMiddleware[Tuple[Message, ...]]],
        Sequence[BrokerMiddleware[Message]],
    ],
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> Union[
    AsyncAPIDefaultSubscriber,
    AsyncAPIBatchSubscriber,
    AsyncAPIConcurrentDefaultSubscriber,
]
create_subscriber(
    *topics,
    partitions,
    polling_interval,
    batch,
    max_records,
    group_id,
    connection_data,
    is_manual,
    no_ack,
    max_workers,
    no_reply,
    retry,
    broker_dependencies,
    broker_middlewares,
    title_,
    description_,
    include_in_schema,
)
Source code in faststream/confluent/subscriber/factory.py
def create_subscriber(
    *topics: str,
    partitions: Sequence["TopicPartition"],
    polling_interval: float,
    batch: bool,
    max_records: Optional[int],
    # Kafka information
    group_id: Optional[str],
    connection_data: "AnyDict",
    is_manual: bool,
    # Subscriber args
    no_ack: bool,
    max_workers: int,
    no_reply: bool,
    retry: bool,
    broker_dependencies: Iterable["Depends"],
    broker_middlewares: Union[
        Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"],
        Sequence["BrokerMiddleware[ConfluentMsg]"],
    ],
    # AsyncAPI args
    title_: Optional[str],
    description_: Optional[str],
    include_in_schema: bool,
) -> Union[
    "AsyncAPIDefaultSubscriber",
    "AsyncAPIBatchSubscriber",
    "AsyncAPIConcurrentDefaultSubscriber",
]:
    if is_manual and max_workers > 1:
        raise SetupError("Max workers not work with manual commit mode.")

    if batch:
        return AsyncAPIBatchSubscriber(
            *topics,
            partitions=partitions,
            polling_interval=polling_interval,
            max_records=max_records,
            group_id=group_id,
            connection_data=connection_data,
            is_manual=is_manual,
            no_ack=no_ack,
            no_reply=no_reply,
            retry=retry,
            broker_dependencies=broker_dependencies,
            broker_middlewares=cast(
                Sequence["BrokerMiddleware[Tuple[ConfluentMsg, ...]]"],
                broker_middlewares,
            ),
            title_=title_,
            description_=description_,
            include_in_schema=include_in_schema,
        )
    else:
        if max_workers > 1:
            return AsyncAPIConcurrentDefaultSubscriber(
                *topics,
                max_workers=max_workers,
                partitions=partitions,
                polling_interval=polling_interval,
                group_id=group_id,
                connection_data=connection_data,
                is_manual=is_manual,
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=cast(
                    Sequence["BrokerMiddleware[ConfluentMsg]"],
                    broker_middlewares,
                ),
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )
        else:
            return AsyncAPIDefaultSubscriber(
                *topics,
                partitions=partitions,
                polling_interval=polling_interval,
                group_id=group_id,
                connection_data=connection_data,
                is_manual=is_manual,
                no_ack=no_ack,
                no_reply=no_reply,
                retry=retry,
                broker_dependencies=broker_dependencies,
                broker_middlewares=cast(
                    Sequence["BrokerMiddleware[ConfluentMsg]"],
                    broker_middlewares,
                ),
                title_=title_,
                description_=description_,
                include_in_schema=include_in_schema,
            )