AsyncConfluentConsumer
faststream.confluent.client.AsyncConfluentConsumer #
AsyncConfluentConsumer(*topics: str, loop: Optional[AbstractEventLoop] = None, bootstrap_servers: Union[str, List[str]] = 'localhost', client_id: Optional[str] = 'confluent-kafka-consumer', group_id: Optional[str] = None, group_instance_id: Optional[str] = None, key_deserializer: Optional[Callable[[bytes], bytes]] = None, value_deserializer: Optional[Callable[[bytes], bytes]] = None, fetch_max_wait_ms: int = 500, fetch_max_bytes: int = 52428800, fetch_min_bytes: int = 1, max_partition_fetch_bytes: int = 1 * 1024 * 1024, request_timeout_ms: int = 40 * 1000, retry_backoff_ms: int = 100, auto_offset_reset: str = 'latest', enable_auto_commit: bool = True, auto_commit_interval_ms: int = 5000, check_crcs: bool = True, metadata_max_age_ms: int = 5 * 60 * 1000, partition_assignment_strategy: Union[str, List[Any]] = 'roundrobin', max_poll_interval_ms: int = 300000, rebalance_timeout_ms: Optional[int] = None, session_timeout_ms: int = 10000, heartbeat_interval_ms: int = 3000, consumer_timeout_ms: int = 200, max_poll_records: Optional[int] = None, ssl_context: Optional[SSLContext] = None, security_protocol: str = 'PLAINTEXT', api_version: str = 'auto', exclude_internal_topics: bool = True, connections_max_idle_ms: int = 540000, isolation_level: str = 'read_uncommitted', sasl_mechanism: Optional[str] = None, sasl_plain_password: Optional[str] = None, sasl_plain_username: Optional[str] = None, sasl_kerberos_service_name: str = 'kafka', sasl_kerberos_domain_name: Optional[str] = None, sasl_oauth_token_provider: Optional[str] = None)
An asynchronous Python Kafka client for consuming messages using the "confluent-kafka" package.
Initializes the AsyncConfluentConsumer with the given configuration and subscribes to the specified topics.
PARAMETER | DESCRIPTION |
---|---|
topics | One or more topic names to subscribe to. TYPE: |
loop | The event loop to use for asynchronous operations. TYPE: |
bootstrap_servers | A list of bootstrap servers for Kafka. |
client_id | A unique identifier for the client. |
group_id | The consumer group ID. |
group_instance_id | A unique identifier for the consumer instance within a group. |
key_deserializer | A callable to deserialize the key. |
value_deserializer | A callable to deserialize the value. |
fetch_max_wait_ms | The maximum time to block waiting for min.bytes data. TYPE: |
fetch_max_bytes | The maximum amount of data the server should return for a fetch request. TYPE: |
fetch_min_bytes | The minimum amount of data the server should return for a fetch request. TYPE: |
max_partition_fetch_bytes | The maximum amount of data per-partition the server will return. TYPE: |
request_timeout_ms | The maximum time to wait for a request to complete. TYPE: |
retry_backoff_ms | The time to back off when a retry is needed. TYPE: |
auto_offset_reset | What to do when there is no initial offset in Kafka or if the current offset does not exist. TYPE: |
enable_auto_commit | If true, the consumer's offset will be periodically committed in the background. TYPE: |
auto_commit_interval_ms | The frequency in milliseconds that the consumer offsets are auto-committed to Kafka. TYPE: |
check_crcs | Automatically check the CRC32 of the records consumed. TYPE: |
metadata_max_age_ms | The maximum age of metadata before a refresh is forced. TYPE: |
partition_assignment_strategy | The name of the partition assignment strategy to use. |
max_poll_interval_ms | The maximum delay between invocations of poll() when using consumer group management. TYPE: |
rebalance_timeout_ms | The maximum time that the group coordinator will wait for each member to rejoin when rebalancing. |
session_timeout_ms | The timeout used to detect consumer failures when using Kafka's group management facility. TYPE: |
heartbeat_interval_ms | The expected time between heartbeats to the group coordinator when using Kafka's group management facilities. TYPE: |
consumer_timeout_ms | The maximum time to block in the consumer waiting for a message. TYPE: |
max_poll_records | The maximum number of records returned in a single call to poll(). |
ssl_context | The SSL context for secure connections. TYPE: |
security_protocol | The security protocol to use. TYPE: |
api_version | The Kafka API version to use. TYPE: |
exclude_internal_topics | Whether internal topics (such as offsets) should be excluded from the subscription. TYPE: |
connections_max_idle_ms | The maximum time a connection can be idle. TYPE: |
isolation_level | The isolation level for reading data. TYPE: |
sasl_mechanism | The SASL mechanism to use for authentication. TYPE: |
sasl_plain_password | The password for SASL/PLAIN authentication. |
sasl_plain_username | The username for SASL/PLAIN authentication. |
sasl_kerberos_service_name | The Kerberos service name for SASL/GSSAPI. TYPE: |
sasl_kerberos_domain_name | The Kerberos domain name for SASL/GSSAPI. |
sasl_oauth_token_provider | The OAuth token provider for SASL/OAUTHBEARER. |
RAISES | DESCRIPTION |
---|---|
ValueError | If the provided bootstrap_servers is not a string or list of strings. |
Source code in faststream/confluent/client.py
334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 |
|
config instance-attribute
#
config = {'allow.auto.create.topics': True, 'bootstrap.servers': bootstrap_servers, 'client.id': client_id, 'group.id': group_id, 'group.instance.id': group_instance_id, 'fetch.wait.max.ms': fetch_max_wait_ms, 'fetch.max.bytes': fetch_max_bytes, 'fetch.min.bytes': fetch_min_bytes, 'max.partition.fetch.bytes': max_partition_fetch_bytes, 'fetch.error.backoff.ms': retry_backoff_ms, 'auto.offset.reset': auto_offset_reset, 'enable.auto.commit': enable_auto_commit, 'auto.commit.interval.ms': auto_commit_interval_ms, 'check.crcs': check_crcs, 'metadata.max.age.ms': metadata_max_age_ms, 'partition.assignment.strategy': partition_assignment_strategy, 'max.poll.interval.ms': max_poll_interval_ms, 'session.timeout.ms': session_timeout_ms, 'heartbeat.interval.ms': heartbeat_interval_ms, 'security.protocol': lower(), 'connections.max.idle.ms': connections_max_idle_ms, 'isolation.level': isolation_level, 'sasl.kerberos.service.name': sasl_kerberos_service_name}
commit async
#
getmany async
#
getmany(timeout_ms: int = 0, max_records: Optional[int] = 10) -> Dict[TopicPartition, List[Message]]
Consumes a batch of messages from Kafka and groups them by topic and partition.
PARAMETER | DESCRIPTION |
---|---|
timeout_ms | The timeout in milliseconds to wait for messages. TYPE: |
max_records | The maximum number of messages to return. |
RETURNS | DESCRIPTION |
---|---|
Dict[TopicPartition, List[Message]] | Dict[TopicPartition, List[Message]]: A dictionary where keys are TopicPartition named tuples and values are lists of messages. |
Source code in faststream/confluent/client.py
getone async
#
getone(timeout_ms: int = 1000) -> Message
Consumes a single message from Kafka.
RETURNS | DESCRIPTION |
---|---|
Message | The consumed message. TYPE: |
Source code in faststream/confluent/client.py
start async
#
Starts the Kafka consumer and subscribes to the specified topics.
Source code in faststream/confluent/client.py
stop async
#
Stops the Kafka consumer and releases all resources.