Skip to content

KafkaBroker

faststream.kafka.KafkaBroker #

KafkaBroker(bootstrap_servers='localhost', *, request_timeout_ms=40 * 1000, retry_backoff_ms=100, metadata_max_age_ms=5 * 60 * 1000, connections_max_idle_ms=9 * 60 * 1000, sasl_kerberos_service_name='kafka', sasl_kerberos_domain_name=None, sasl_oauth_token_provider=None, loop=None, client_id=SERVICE_NAME, acks=_missing, key_serializer=None, value_serializer=None, compression_type=None, max_batch_size=16 * 1024, partitioner=DefaultPartitioner(), max_request_size=1024 * 1024, linger_ms=0, enable_idempotence=False, transactional_id=None, transaction_timeout_ms=60 * 1000, graceful_timeout=15.0, decoder=None, parser=None, dependencies=(), middlewares=(), security=None, asyncapi_url=None, protocol=None, protocol_version='auto', description=None, tags=None, logger=EMPTY, log_level=logging.INFO, log_fmt=None, apply_types=True, validate=True, _get_dependant=None, _call_decorators=())

Bases: KafkaRegistrator, KafkaLoggingBroker

Source code in faststream/kafka/broker/broker.py
def __init__(
    self,
    bootstrap_servers: Annotated[
        Union[str, Iterable[str]],
        Doc(
            """
        A `host[:port]` string (or list of `host[:port]` strings) that the consumer should contact to bootstrap
        initial cluster metadata.

        This does not have to be the full node list.
        It just needs to have at least one broker that will respond to a
        Metadata API Request. Default port is 9092.
        """
        ),
    ] = "localhost",
    *,
    # both
    request_timeout_ms: Annotated[
        int,
        Doc("Client request timeout in milliseconds."),
    ] = 40 * 1000,
    retry_backoff_ms: Annotated[
        int,
        Doc("Milliseconds to backoff when retrying on errors."),
    ] = 100,
    metadata_max_age_ms: Annotated[
        int,
        Doc(
            """
        The period of time in milliseconds after
        which we force a refresh of metadata even if we haven't seen any
        partition leadership changes to proactively discover any new
        brokers or partitions.
        """
        ),
    ] = 5 * 60 * 1000,
    connections_max_idle_ms: Annotated[
        int,
        Doc(
            """
         Close idle connections after the number
        of milliseconds specified by this config. Specifying `None` will
        disable idle checks.
        """
        ),
    ] = 9 * 60 * 1000,
    sasl_kerberos_service_name: str = "kafka",
    sasl_kerberos_domain_name: Optional[str] = None,
    sasl_oauth_token_provider: Annotated[
        Optional["AbstractTokenProvider"],
        Doc("OAuthBearer token provider instance."),
    ] = None,
    loop: Optional["AbstractEventLoop"] = None,
    client_id: Annotated[
        Optional[str],
        Doc(
            """
        A name for this client. This string is passed in
        each request to servers and can be used to identify specific
        server-side log entries that correspond to this client. Also
        submitted to :class:`~.consumer.group_coordinator.GroupCoordinator`
        for logging with respect to consumer group administration.
        """
        ),
    ] = SERVICE_NAME,
    # publisher args
    acks: Annotated[
        Union[Literal[0, 1, -1, "all"], object],
        Doc(
            """
        One of ``0``, ``1``, ``all``. The number of acknowledgments
        the producer requires the leader to have received before considering a
        request complete. This controls the durability of records that are
        sent. The following settings are common:

        * ``0``: Producer will not wait for any acknowledgment from the server
          at all. The message will immediately be added to the socket
          buffer and considered sent. No guarantee can be made that the
          server has received the record in this case, and the retries
          configuration will not take effect (as the client won't
          generally know of any failures). The offset given back for each
          record will always be set to -1.
        * ``1``: The broker leader will write the record to its local log but
          will respond without awaiting full acknowledgement from all
          followers. In this case should the leader fail immediately
          after acknowledging the record but before the followers have
          replicated it then the record will be lost.
        * ``all``: The broker leader will wait for the full set of in-sync
          replicas to acknowledge the record. This guarantees that the
          record will not be lost as long as at least one in-sync replica
          remains alive. This is the strongest available guarantee.

        If unset, defaults to ``acks=1``. If `enable_idempotence` is
        :data:`True` defaults to ``acks=all``.
        """
        ),
    ] = _missing,
    key_serializer: Annotated[
        Optional[Callable[[Any], bytes]],
        Doc("Used to convert user-supplied keys to bytes."),
    ] = None,
    value_serializer: Annotated[
        Optional[Callable[[Any], bytes]],
        Doc("used to convert user-supplied message values to bytes."),
    ] = None,
    compression_type: Annotated[
        Optional[Literal["gzip", "snappy", "lz4", "zstd"]],
        Doc(
            """
        The compression type for all data generated bythe producer.
        Compression is of full batches of data, so the efficacy of batching
        will also impact the compression ratio (more batching means better
        compression).
        """
        ),
    ] = None,
    max_batch_size: Annotated[
        int,
        Doc(
            """
        Maximum size of buffered data per partition.
        After this amount `send` coroutine will block until batch is drained.
        """
        ),
    ] = 16 * 1024,
    partitioner: Annotated[
        Callable[
            [bytes, List[Partition], List[Partition]],
            Partition,
        ],
        Doc(
            """
        Callable used to determine which partition
        each message is assigned to. Called (after key serialization):
        ``partitioner(key_bytes, all_partitions, available_partitions)``.
        The default partitioner implementation hashes each non-None key
        using the same murmur2 algorithm as the Java client so that
        messages with the same key are assigned to the same partition.
        When a key is :data:`None`, the message is delivered to a random partition
        (filtered to partitions with available leaders only, if possible).
        """
        ),
    ] = DefaultPartitioner(),
    max_request_size: Annotated[
        int,
        Doc(
            """
        The maximum size of a request. This is also
        effectively a cap on the maximum record size. Note that the server
        has its own cap on record size which may be different from this.
        This setting will limit the number of record batches the producer
        will send in a single request to avoid sending huge requests.
        """
        ),
    ] = 1024 * 1024,
    linger_ms: Annotated[
        int,
        Doc(
            """
        The producer groups together any records that arrive
        in between request transmissions into a single batched request.
        Normally this occurs only under load when records arrive faster
        than they can be sent out. However in some circumstances the client
        may want to reduce the number of requests even under moderate load.
        This setting accomplishes this by adding a small amount of
        artificial delay; that is, if first request is processed faster,
        than `linger_ms`, producer will wait ``linger_ms - process_time``.
        """
        ),
    ] = 0,
    enable_idempotence: Annotated[
        bool,
        Doc(
            """
        When set to `True`, the producer will
        ensure that exactly one copy of each message is written in the
        stream. If `False`, producer retries due to broker failures,
        etc., may write duplicates of the retried message in the stream.
        Note that enabling idempotence acks to set to ``all``. If it is not
        explicitly set by the user it will be chosen.
        """
        ),
    ] = False,
    transactional_id: Optional[str] = None,
    transaction_timeout_ms: int = 60 * 1000,
    # broker base args
    graceful_timeout: Annotated[
        Optional[float],
        Doc(
            "Graceful shutdown timeout. Broker waits for all running subscribers completion before shut down."
        ),
    ] = 15.0,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Custom decoder object."),
    ] = None,
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Custom parser object."),
    ] = None,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies to apply to all broker subscribers."),
    ] = (),
    middlewares: Annotated[
        Iterable[
            Union[
                "BrokerMiddleware[ConsumerRecord]",
                "BrokerMiddleware[Tuple[ConsumerRecord, ...]]",
            ]
        ],
        Doc("Middlewares to apply to all broker publishers/subscribers."),
    ] = (),
    # AsyncAPI args
    security: Annotated[
        Optional["BaseSecurity"],
        Doc(
            "Security options to connect broker and generate AsyncAPI server security information."
        ),
    ] = None,
    asyncapi_url: Annotated[
        Union[str, Iterable[str], None],
        Doc("AsyncAPI hardcoded server addresses. Use `servers` if not specified."),
    ] = None,
    protocol: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol."),
    ] = None,
    protocol_version: Annotated[
        Optional[str],
        Doc("AsyncAPI server protocol version."),
    ] = "auto",
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI server description."),
    ] = None,
    tags: Annotated[
        Optional[Iterable[Union["asyncapi.Tag", "asyncapi.TagDict"]]],
        Doc("AsyncAPI server tags."),
    ] = None,
    # logging args
    logger: Annotated[
        Optional["LoggerProto"],
        Doc("User specified logger to pass into Context and log service messages."),
    ] = EMPTY,
    log_level: Annotated[
        int,
        Doc("Service messages log level."),
    ] = logging.INFO,
    log_fmt: Annotated[
        Optional[str],
        Doc("Default logger log format."),
    ] = None,
    # FastDepends args
    apply_types: Annotated[
        bool,
        Doc("Whether to use FastDepends or not."),
    ] = True,
    validate: Annotated[
        bool,
        Doc("Whether to cast types using Pydantic validation."),
    ] = True,
    _get_dependant: Annotated[
        Optional[Callable[..., Any]],
        Doc("Custom library dependant generator callback."),
    ] = None,
    _call_decorators: Annotated[
        Iterable["Decorator"],
        Doc("Any custom decorator to apply to wrapped functions."),
    ] = (),
) -> None:
    if protocol is None:
        if security is not None and security.use_ssl:
            protocol = "kafka-secure"
        else:
            protocol = "kafka"

    servers = (
        [bootstrap_servers]
        if isinstance(bootstrap_servers, str)
        else list(bootstrap_servers)
    )

    if asyncapi_url is not None:
        if isinstance(asyncapi_url, str):
            asyncapi_url = [asyncapi_url]
        else:
            asyncapi_url = list(asyncapi_url)
    else:
        asyncapi_url = servers

    super().__init__(
        bootstrap_servers=servers,
        # both args
        client_id=client_id,
        api_version=protocol_version,
        request_timeout_ms=request_timeout_ms,
        retry_backoff_ms=retry_backoff_ms,
        metadata_max_age_ms=metadata_max_age_ms,
        connections_max_idle_ms=connections_max_idle_ms,
        sasl_kerberos_service_name=sasl_kerberos_service_name,
        sasl_kerberos_domain_name=sasl_kerberos_domain_name,
        sasl_oauth_token_provider=sasl_oauth_token_provider,
        loop=loop,
        # publisher args
        acks=acks,
        key_serializer=key_serializer,
        value_serializer=value_serializer,
        compression_type=compression_type,
        max_batch_size=max_batch_size,
        partitioner=partitioner,
        max_request_size=max_request_size,
        linger_ms=linger_ms,
        enable_idempotence=enable_idempotence,
        transactional_id=transactional_id,
        transaction_timeout_ms=transaction_timeout_ms,
        # Basic args
        graceful_timeout=graceful_timeout,
        dependencies=dependencies,
        decoder=decoder,
        parser=parser,
        middlewares=middlewares,
        # AsyncAPI args
        description=description,
        asyncapi_url=asyncapi_url,
        protocol=protocol,
        protocol_version=protocol_version,
        security=security,
        tags=tags,
        # Logging args
        logger=logger,
        log_level=log_level,
        log_fmt=log_fmt,
        # FastDepends args
        _get_dependant=_get_dependant,
        _call_decorators=_call_decorators,
        apply_types=apply_types,
        validate=validate,
    )

    self.client_id = client_id
    self._producer = None

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

logger instance-attribute #

logger

use_custom instance-attribute #

use_custom = True

running instance-attribute #

running = False

graceful_timeout instance-attribute #

graceful_timeout = graceful_timeout

protocol instance-attribute #

protocol = protocol

protocol_version instance-attribute #

protocol_version = protocol_version

description instance-attribute #

description = description

tags instance-attribute #

tags = tags

security instance-attribute #

security = security

url instance-attribute #

url

client_id instance-attribute #

client_id = client_id

setup #

setup()

Prepare all Broker entities to startup.

Source code in faststream/broker/core/usecase.py
def setup(self) -> None:
    """Prepare all Broker entities to startup."""
    for h in self._subscribers.values():
        self.setup_subscriber(h)

    for p in self._publishers.values():
        self.setup_publisher(p)

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(*topics, batch=False, group_id=None, key_deserializer=None, value_deserializer=None, fetch_max_bytes=50 * 1024 * 1024, fetch_min_bytes=1, fetch_max_wait_ms=500, max_partition_fetch_bytes=1 * 1024 * 1024, auto_offset_reset='latest', auto_commit=True, auto_commit_interval_ms=5 * 1000, check_crcs=True, partition_assignment_strategy=(RoundRobinPartitionAssignor), max_poll_interval_ms=5 * 60 * 1000, rebalance_timeout_ms=None, session_timeout_ms=10 * 1000, heartbeat_interval_ms=3 * 1000, consumer_timeout_ms=200, max_poll_records=None, exclude_internal_topics=True, isolation_level='read_uncommitted', batch_timeout_ms=200, max_records=None, listener=None, pattern=None, partitions=(), dependencies=(), parser=None, decoder=None, middlewares=(), filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)
Source code in faststream/kafka/broker/registrator.py
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
@override
def subscriber(
    self,
    *topics: Annotated[
        str,
        Doc("Kafka topics to consume messages from."),
    ],
    batch: Annotated[
        bool,
        Doc("Whether to consume messages in batches or not."),
    ] = False,
    group_id: Annotated[
        Optional[str],
        Doc(
            """
        Name of the consumer group to join for dynamic
        partition assignment (if enabled), and to use for fetching and
        committing offsets. If `None`, auto-partition assignment (via
        group coordinator) and offset commits are disabled.
        """
        ),
    ] = None,
    key_deserializer: Annotated[
        Optional[Callable[[bytes], Any]],
        Doc(
            "Any callable that takes a raw message `bytes` "
            "key and returns a deserialized one."
        ),
    ] = None,
    value_deserializer: Annotated[
        Optional[Callable[[bytes], Any]],
        Doc(
            "Any callable that takes a raw message `bytes` "
            "value and returns a deserialized value."
        ),
    ] = None,
    fetch_max_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data the server should
        return for a fetch request. This is not an absolute maximum, if
        the first message in the first non-empty partition of the fetch
        is larger than this value, the message will still be returned
        to ensure that the consumer can make progress. NOTE: consumer
        performs fetches to multiple brokers in parallel so memory
        usage will depend on the number of brokers containing
        partitions for the topic.
        """
        ),
    ] = 50 * 1024 * 1024,
    fetch_min_bytes: Annotated[
        int,
        Doc(
            """
        Minimum amount of data the server should
        return for a fetch request, otherwise wait up to
        `fetch_max_wait_ms` for more data to accumulate.
        """
        ),
    ] = 1,
    fetch_max_wait_ms: Annotated[
        int,
        Doc(
            """
        The maximum amount of time in milliseconds
        the server will block before answering the fetch request if
        there isn't sufficient data to immediately satisfy the
        requirement given by `fetch_min_bytes`.
        """
        ),
    ] = 500,
    max_partition_fetch_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data
        per-partition the server will return. The maximum total memory
        used for a request ``= #partitions * max_partition_fetch_bytes``.
        This size must be at least as large as the maximum message size
        the server allows or else it is possible for the producer to
        send messages larger than the consumer can fetch. If that
        happens, the consumer can get stuck trying to fetch a large
        message on a certain partition.
        """
        ),
    ] = 1 * 1024 * 1024,
    auto_offset_reset: Annotated[
        Literal["latest", "earliest", "none"],
        Doc(
            """
        A policy for resetting offsets on `OffsetOutOfRangeError` errors:

        * `earliest` will move to the oldest available message
        * `latest` will move to the most recent
        * `none` will raise an exception so you can handle this case
        """
        ),
    ] = "latest",
    auto_commit: Annotated[
        bool,
        Doc(
            """
        If `True` the consumer's offset will be
        periodically committed in the background.
        """
        ),
    ] = True,
    auto_commit_interval_ms: Annotated[
        int,
        Doc(
            """
        Milliseconds between automatic
        offset commits, if `auto_commit` is `True`."""
        ),
    ] = 5 * 1000,
    check_crcs: Annotated[
        bool,
        Doc(
            """
        Automatically check the CRC32 of the records
        consumed. This ensures no on-the-wire or on-disk corruption to
        the messages occurred. This check adds some overhead, so it may
        be disabled in cases seeking extreme performance.
        """
        ),
    ] = True,
    partition_assignment_strategy: Annotated[
        Sequence["AbstractPartitionAssignor"],
        Doc(
            """
        List of objects to use to
        distribute partition ownership amongst consumer instances when
        group management is used. This preference is implicit in the order
        of the strategies in the list. When assignment strategy changes:
        to support a change to the assignment strategy, new versions must
        enable support both for the old assignment strategy and the new
        one. The coordinator will choose the old assignment strategy until
        all members have been updated. Then it will choose the new
        strategy.
        """
        ),
    ] = (RoundRobinPartitionAssignor,),
    max_poll_interval_ms: Annotated[
        int,
        Doc(
            """
        Maximum allowed time between calls to
        consume messages in batches. If this interval
        is exceeded the consumer is considered failed and the group will
        rebalance in order to reassign the partitions to another consumer
        group member. If API methods block waiting for messages, that time
        does not count against this timeout.
        """
        ),
    ] = 5 * 60 * 1000,
    rebalance_timeout_ms: Annotated[
        Optional[int],
        Doc(
            """
        The maximum time server will wait for this
        consumer to rejoin the group in a case of rebalance. In Java client
        this behaviour is bound to `max.poll.interval.ms` configuration,
        but as ``aiokafka`` will rejoin the group in the background, we
        decouple this setting to allow finer tuning by users that use
        `ConsumerRebalanceListener` to delay rebalacing. Defaults
        to ``session_timeout_ms``
        """
        ),
    ] = None,
    session_timeout_ms: Annotated[
        int,
        Doc(
            """
        Client group session and failure detection
        timeout. The consumer sends periodic heartbeats
        (`heartbeat.interval.ms`) to indicate its liveness to the broker.
        If no hearts are received by the broker for a group member within
        the session timeout, the broker will remove the consumer from the
        group and trigger a rebalance. The allowed range is configured with
        the **broker** configuration properties
        `group.min.session.timeout.ms` and `group.max.session.timeout.ms`.
        """
        ),
    ] = 10 * 1000,
    heartbeat_interval_ms: Annotated[
        int,
        Doc(
            """
        The expected time in milliseconds
        between heartbeats to the consumer coordinator when using
        Kafka's group management feature. Heartbeats are used to ensure
        that the consumer's session stays active and to facilitate
        rebalancing when new consumers join or leave the group. The
        value must be set lower than `session_timeout_ms`, but typically
        should be set no higher than 1/3 of that value. It can be
        adjusted even lower to control the expected time for normal
        rebalances.
        """
        ),
    ] = 3 * 1000,
    consumer_timeout_ms: Annotated[
        int,
        Doc(
            """
        Maximum wait timeout for background fetching
        routine. Mostly defines how fast the system will see rebalance and
        request new data for new partitions.
        """
        ),
    ] = 200,
    max_poll_records: Annotated[
        Optional[int],
        Doc(
            """
        The maximum number of records returned in a
        single call by batch consumer. Has no limit by default.
        """
        ),
    ] = None,
    exclude_internal_topics: Annotated[
        bool,
        Doc(
            """
        Whether records from internal topics
        (such as offsets) should be exposed to the consumer. If set to True
        the only way to receive records from an internal topic is
        subscribing to it.
        """
        ),
    ] = True,
    isolation_level: Annotated[
        Literal["read_uncommitted", "read_committed"],
        Doc(
            """
        Controls how to read messages written
        transactionally.

        * `read_committed`, batch consumer will only return
        transactional messages which have been committed.

        * `read_uncommitted` (the default), batch consumer will
        return all messages, even transactional messages which have been
        aborted.

        Non-transactional messages will be returned unconditionally in
        either mode.

        Messages will always be returned in offset order. Hence, in
        `read_committed` mode, batch consumer will only return
        messages up to the last stable offset (LSO), which is the one less
        than the offset of the first open transaction. In particular any
        messages appearing after messages belonging to ongoing transactions
        will be withheld until the relevant transaction has been completed.
        As a result, `read_committed` consumers will not be able to read up
        to the high watermark when there are in flight transactions.
        Further, when in `read_committed` the seek_to_end method will
        return the LSO. See method docs below.
        """
        ),
    ] = "read_uncommitted",
    batch_timeout_ms: Annotated[
        int,
        Doc(
            """
        Milliseconds spent waiting if
        data is not available in the buffer. If 0, returns immediately
        with any records that are available currently in the buffer,
        else returns empty.
        """
        ),
    ] = 200,
    max_records: Annotated[
        Optional[int],
        Doc("Number of messages to consume as one batch."),
    ] = None,
    listener: Annotated[
        Optional["ConsumerRebalanceListener"],
        Doc(
            """
        Optionally include listener
           callback, which will be called before and after each rebalance
           operation.
           As part of group management, the consumer will keep track of
           the list of consumers that belong to a particular group and
           will trigger a rebalance operation if one of the following
           events trigger:

           * Number of partitions change for any of the subscribed topics
           * Topic is created or deleted
           * An existing member of the consumer group dies
           * A new member is added to the consumer group

           When any of these events are triggered, the provided listener
           will be invoked first to indicate that the consumer's
           assignment has been revoked, and then again when the new
           assignment has been received. Note that this listener will
           immediately override any listener set in a previous call
           to subscribe. It is guaranteed, however, that the partitions
           revoked/assigned
           through this interface are from topics subscribed in this call.
        """
        ),
    ] = None,
    pattern: Annotated[
        Optional[str],
        Doc(
            """
        Pattern to match available topics. You must provide either topics or pattern, but not both.
        """
        ),
    ] = None,
    partitions: Annotated[
        Iterable["TopicPartition"],
        Doc(
            """
        An explicit partitions list to assign.
        You can't use 'topics' and 'partitions' in the same time.
        """
        ),
    ] = (),
    # broker args
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **ConsumerRecord** object to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Iterable["SubscriberMiddleware[KafkaMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    filter: Annotated[
        "Filter[KafkaMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIDefaultSubscriber",
    "AsyncAPIBatchSubscriber",
]:
    subscriber = super().subscriber(
        create_subscriber(
            *topics,
            batch=batch,
            batch_timeout_ms=batch_timeout_ms,
            max_records=max_records,
            group_id=group_id,
            listener=listener,
            pattern=pattern,
            connection_args={
                "key_deserializer": key_deserializer,
                "value_deserializer": value_deserializer,
                "fetch_max_wait_ms": fetch_max_wait_ms,
                "fetch_max_bytes": fetch_max_bytes,
                "fetch_min_bytes": fetch_min_bytes,
                "max_partition_fetch_bytes": max_partition_fetch_bytes,
                "auto_offset_reset": auto_offset_reset,
                "enable_auto_commit": auto_commit,
                "auto_commit_interval_ms": auto_commit_interval_ms,
                "check_crcs": check_crcs,
                "partition_assignment_strategy": partition_assignment_strategy,
                "max_poll_interval_ms": max_poll_interval_ms,
                "rebalance_timeout_ms": rebalance_timeout_ms,
                "session_timeout_ms": session_timeout_ms,
                "heartbeat_interval_ms": heartbeat_interval_ms,
                "consumer_timeout_ms": consumer_timeout_ms,
                "max_poll_records": max_poll_records,
                "exclude_internal_topics": exclude_internal_topics,
                "isolation_level": isolation_level,
            },
            partitions=partitions,
            is_manual=not auto_commit,
            # subscriber args
            no_ack=no_ack,
            no_reply=no_reply,
            retry=retry,
            broker_middlewares=self._middlewares,
            broker_dependencies=self._dependencies,
            # AsyncAPI
            title_=title,
            description_=description,
            include_in_schema=self._solve_include_in_schema(include_in_schema),
        )
    )

    if batch:
        return cast("AsyncAPIBatchSubscriber", subscriber).add_call(
            filter_=filter,
            parser_=parser or self._parser,
            decoder_=decoder or self._decoder,
            dependencies_=dependencies,
            middlewares_=middlewares,
        )

    else:
        return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
            filter_=filter,
            parser_=parser or self._parser,
            decoder_=decoder or self._decoder,
            dependencies_=dependencies,
            middlewares_=middlewares,
        )

publisher #

publisher(topic, *, key=None, partition=None, headers=None, reply_to='', batch=False, middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/kafka/broker/registrator.py
@override
def publisher(
    self,
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Topic name to send response."),
    ] = "",
    batch: Annotated[
        bool,
        Doc("Whether to send messages in batches or not."),
    ] = False,
    # basic args
    middlewares: Annotated[
        Iterable["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIBatchPublisher",
    "AsyncAPIDefaultPublisher",
]:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    publisher = AsyncAPIPublisher.create(
        # batch flag
        batch=batch,
        # default args
        key=key,
        # both args
        topic=topic,
        partition=partition,
        headers=headers,
        reply_to=reply_to,
        # publisher-specific
        broker_middlewares=self._middlewares,
        middlewares=middlewares,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=self._solve_include_in_schema(include_in_schema),
    )

    if batch:
        return cast("AsyncAPIBatchPublisher", super().publisher(publisher))
    else:
        return cast("AsyncAPIDefaultPublisher", super().publisher(publisher))

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)

get_fmt #

get_fmt()
Source code in faststream/kafka/broker/logging.py
def get_fmt(self) -> str:
    return (
        "%(asctime)s %(levelname)-8s - "
        + f"%(topic)-{self._max_topic_len}s | "
        + (f"%(group_id)-{self._max_group_len}s | " if self._max_group_len else "")
        + f"%(message_id)-{self.__max_msg_id_ln}s "
        + "- %(message)s"
    )

setup_subscriber #

setup_subscriber(subscriber, **kwargs)

Setup the Subscriber to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_subscriber(
    self,
    subscriber: SubscriberProto[MsgType],
    **kwargs: Any,
) -> None:
    """Setup the Subscriber to prepare it to starting."""
    data = self._subscriber_setup_extra.copy()
    data.update(kwargs)
    subscriber.setup(**data)

setup_publisher #

setup_publisher(publisher, **kwargs)

Setup the Publisher to prepare it to starting.

Source code in faststream/broker/core/usecase.py
def setup_publisher(
    self,
    publisher: "PublisherProto[MsgType]",
    **kwargs: Any,
) -> None:
    """Setup the Publisher to prepare it to starting."""
    data = self._publisher_setup_extra.copy()
    data.update(kwargs)
    publisher.setup(**data)

close async #

close(exc_type=None, exc_val=None, exc_tb=None)

Closes the object.

Source code in faststream/broker/core/usecase.py
async def close(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> None:
    """Closes the object."""
    self.running = False

    for h in self._subscribers.values():
        await h.close()

    if self._connection is not None:
        await self._close(exc_type, exc_val, exc_tb)

connect async #

connect(bootstrap_servers=EMPTY, **kwargs)

Connect to Kafka servers manually.

Consumes the same with KafkaBroker.__init__ arguments and overrides them. To startup subscribers too you should use broker.start() after/instead this method.

Source code in faststream/kafka/broker/broker.py
@override
async def connect(  # type: ignore[override]
    self,
    bootstrap_servers: Annotated[
        Union[str, Iterable[str]],
        Doc("Kafka addresses to connect."),
    ] = EMPTY,
    **kwargs: "Unpack[KafkaInitKwargs]",
) -> Callable[..., aiokafka.AIOKafkaConsumer]:
    """Connect to Kafka servers manually.

    Consumes the same with `KafkaBroker.__init__` arguments and overrides them.
    To startup subscribers too you should use `broker.start()` after/instead this method.
    """
    if bootstrap_servers is not EMPTY:
        connect_kwargs: AnyDict = {
            **kwargs,
            "bootstrap_servers": bootstrap_servers,
        }
    else:
        connect_kwargs = {**kwargs}

    return await super().connect(**connect_kwargs)

start async #

start()

Connect broker to Kafka and startup all subscribers.

Source code in faststream/kafka/broker/broker.py
async def start(self) -> None:
    """Connect broker to Kafka and startup all subscribers."""
    await super().start()

    for handler in self._subscribers.values():
        self._log(
            f"`{handler.call_name}` waiting for messages",
            extra=handler.get_log_context(None),
        )
        await handler.start()

publish async #

publish(message, topic, *, key=None, partition=None, timestamp_ms=None, headers=None, correlation_id=None, reply_to='', no_confirm=False, **kwargs)

Publish message directly.

This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks applications or to publish messages from time to time.

Please, use @broker.publisher(...) or broker.publisher(...).publish(...) instead in a regular way.

Source code in faststream/kafka/broker/broker.py
@override
async def publish(  # type: ignore[override]
    self,
    message: Annotated[
        "SendableMessage",
        Doc("Message body to send."),
    ],
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    timestamp_ms: Annotated[
        Optional[int],
        Doc(
            """
        Epoch milliseconds (from Jan 1 1970 UTC) to use as
        the message timestamp. Defaults to current time.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc("Message headers to store metainformation."),
    ] = None,
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Reply message topic name to send response."),
    ] = "",
    no_confirm: Annotated[
        bool,
        Doc("Do not wait for Kafka publish confirmation."),
    ] = False,
    # extra options to be compatible with test client
    **kwargs: Any,
) -> Optional[Any]:
    """Publish message directly.

    This method allows you to publish message in not AsyncAPI-documented way. You can use it in another frameworks
    applications or to publish messages from time to time.

    Please, use `@broker.publisher(...)` or `broker.publisher(...).publish(...)` instead in a regular way.
    """
    correlation_id = correlation_id or gen_cor_id()

    return await super().publish(
        message,
        producer=self._producer,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
        reply_to=reply_to,
        no_confirm=no_confirm,
        **kwargs,
    )

request async #

request(message, topic, *, key=None, partition=None, timestamp_ms=None, headers=None, correlation_id=None, timeout=0.5)
Source code in faststream/kafka/broker/broker.py
@override
async def request(  # type: ignore[override]
    self,
    message: Annotated[
        "SendableMessage",
        Doc("Message body to send."),
    ],
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    timestamp_ms: Annotated[
        Optional[int],
        Doc(
            """
        Epoch milliseconds (from Jan 1 1970 UTC) to use as
        the message timestamp. Defaults to current time.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc("Message headers to store metainformation."),
    ] = None,
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    timeout: Annotated[
        float,
        Doc("Timeout to send RPC request."),
    ] = 0.5,
) -> Optional[Any]:
    correlation_id = correlation_id or gen_cor_id()

    return await super().request(
        message,
        producer=self._producer,
        topic=topic,
        key=key,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        correlation_id=correlation_id,
        timeout=timeout,
    )

publish_batch async #

publish_batch(*msgs, topic, partition=None, timestamp_ms=None, headers=None, reply_to='', correlation_id=None, no_confirm=False)
Source code in faststream/kafka/broker/broker.py
async def publish_batch(
    self,
    *msgs: Annotated[
        "SendableMessage",
        Doc("Messages bodies to send."),
    ],
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    timestamp_ms: Annotated[
        Optional[int],
        Doc(
            """
        Epoch milliseconds (from Jan 1 1970 UTC) to use as
        the message timestamp. Defaults to current time.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc("Messages headers to store metainformation."),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Reply message topic name to send response."),
    ] = "",
    correlation_id: Annotated[
        Optional[str],
        Doc(
            "Manual message **correlation_id** setter. "
            "**correlation_id** is a useful option to trace messages."
        ),
    ] = None,
    no_confirm: Annotated[
        bool,
        Doc("Do not wait for Kafka publish confirmation."),
    ] = False,
) -> None:
    assert self._producer, NOT_CONNECTED_YET  # nosec B101

    correlation_id = correlation_id or gen_cor_id()

    call: AsyncFunc = self._producer.publish_batch

    for m in self._middlewares:
        call = partial(m(None).publish_scope, call)

    await call(
        *msgs,
        topic=topic,
        partition=partition,
        timestamp_ms=timestamp_ms,
        headers=headers,
        reply_to=reply_to,
        correlation_id=correlation_id,
        no_confirm=no_confirm,
    )

ping async #

ping(timeout)
Source code in faststream/kafka/broker/broker.py
@override
async def ping(self, timeout: Optional[float]) -> bool:
    sleep_time = (timeout or 10) / 10

    with anyio.move_on_after(timeout) as cancel_scope:
        if self._producer is None:
            return False

        while True:
            if cancel_scope.cancel_called:
                return False

            if not self._producer._producer._closed:
                return True

            await anyio.sleep(sleep_time)

    return False