Skip to content

KafkaRouter

faststream.kafka.router.KafkaRouter #

KafkaRouter(prefix='', handlers=(), *, dependencies=(), middlewares=(), parser=None, decoder=None, include_in_schema=None)

Bases: KafkaRegistrator, BrokerRouter[Union['ConsumerRecord', Tuple['ConsumerRecord', ...]]]

Includable to KafkaBroker router.

Source code in faststream/kafka/router.py
def __init__(
    self,
    prefix: Annotated[
        str,
        Doc("String prefix to add to all subscribers queues."),
    ] = "",
    handlers: Annotated[
        Iterable[KafkaRoute],
        Doc("Route object to include."),
    ] = (),
    *,
    dependencies: Annotated[
        Iterable["Depends"],
        Doc(
            "Dependencies list (`[Depends(),]`) to apply to all routers' publishers/subscribers."
        ),
    ] = (),
    middlewares: Annotated[
        Sequence[
            Union[
                "BrokerMiddleware[ConsumerRecord]",
                "BrokerMiddleware[Tuple[ConsumerRecord, ...]]",
            ]
        ],
        Doc("Router middlewares to apply to all routers' publishers/subscribers."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **ConsumerRecord** object to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    include_in_schema: Annotated[
        Optional[bool],
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = None,
) -> None:
    super().__init__(
        handlers=handlers,
        # basic args
        prefix=prefix,
        dependencies=dependencies,
        middlewares=middlewares,
        parser=parser,
        decoder=decoder,
        include_in_schema=include_in_schema,
    )

prefix instance-attribute #

prefix = prefix

include_in_schema instance-attribute #

include_in_schema = include_in_schema

add_middleware #

add_middleware(middleware)

Append BrokerMiddleware to the end of middlewares list.

Current middleware will be used as a most inner of already existed ones.

Source code in faststream/broker/core/abc.py
def add_middleware(self, middleware: "BrokerMiddleware[MsgType]") -> None:
    """Append BrokerMiddleware to the end of middlewares list.

    Current middleware will be used as a most inner of already existed ones.
    """
    self._middlewares = (*self._middlewares, middleware)

    for sub in self._subscribers.values():
        sub.add_middleware(middleware)

    for pub in self._publishers.values():
        pub.add_middleware(middleware)

subscriber #

subscriber(*topics: str, batch: Literal[False] = False, group_id: Optional[str] = None, key_deserializer: Optional[Callable[[bytes], Any]] = None, value_deserializer: Optional[Callable[[bytes], Any]] = None, fetch_max_bytes: int = 50 * 1024 * 1024, fetch_min_bytes: int = 1, fetch_max_wait_ms: int = 500, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5 * 1000, check_crcs: bool = True, partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (RoundRobinPartitionAssignor), max_poll_interval_ms: int = 5 * 60 * 1000, rebalance_timeout_ms: Optional[int] = None, session_timeout_ms: int = 10 * 1000, heartbeat_interval_ms: int = 3 * 1000, consumer_timeout_ms: int = 200, max_poll_records: Optional[int] = None, exclude_internal_topics: bool = True, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', batch_timeout_ms: int = 200, max_records: Optional[int] = None, listener: Optional[ConsumerRebalanceListener] = None, pattern: Optional[str] = None, partitions: Iterable[TopicPartition] = (), dependencies: Iterable[Depends] = (), parser: Optional[CustomCallable] = None, decoder: Optional[CustomCallable] = None, middlewares: Sequence[SubscriberMiddleware[KafkaMessage]] = (), filter: Filter[KafkaMessage] = default_filter, retry: bool = False, no_ack: bool = False, no_reply: bool = False, title: Optional[str] = None, description: Optional[str] = None, include_in_schema: bool = True) -> AsyncAPIDefaultSubscriber
subscriber(*topics: str, batch: Literal[True], group_id: Optional[str] = None, key_deserializer: Optional[Callable[[bytes], Any]] = None, value_deserializer: Optional[Callable[[bytes], Any]] = None, fetch_max_bytes: int = 50 * 1024 * 1024, fetch_min_bytes: int = 1, fetch_max_wait_ms: int = 500, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5 * 1000, check_crcs: bool = True, partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (RoundRobinPartitionAssignor), max_poll_interval_ms: int = 5 * 60 * 1000, rebalance_timeout_ms: Optional[int] = None, session_timeout_ms: int = 10 * 1000, heartbeat_interval_ms: int = 3 * 1000, consumer_timeout_ms: int = 200, max_poll_records: Optional[int] = None, exclude_internal_topics: bool = True, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', batch_timeout_ms: int = 200, max_records: Optional[int] = None, listener: Optional[ConsumerRebalanceListener] = None, pattern: Optional[str] = None, partitions: Iterable[TopicPartition] = (), dependencies: Iterable[Depends] = (), parser: Optional[CustomCallable] = None, decoder: Optional[CustomCallable] = None, middlewares: Sequence[SubscriberMiddleware[KafkaMessage]] = (), filter: Filter[KafkaMessage] = default_filter, retry: bool = False, no_ack: bool = False, no_reply: bool = False, title: Optional[str] = None, description: Optional[str] = None, include_in_schema: bool = True) -> AsyncAPIBatchSubscriber
subscriber(*topics: str, batch: bool = False, group_id: Optional[str] = None, key_deserializer: Optional[Callable[[bytes], Any]] = None, value_deserializer: Optional[Callable[[bytes], Any]] = None, fetch_max_bytes: int = 50 * 1024 * 1024, fetch_min_bytes: int = 1, fetch_max_wait_ms: int = 500, max_partition_fetch_bytes: int = 1 * 1024 * 1024, auto_offset_reset: Literal['latest', 'earliest', 'none'] = 'latest', auto_commit: bool = True, auto_commit_interval_ms: int = 5 * 1000, check_crcs: bool = True, partition_assignment_strategy: Sequence[AbstractPartitionAssignor] = (RoundRobinPartitionAssignor), max_poll_interval_ms: int = 5 * 60 * 1000, rebalance_timeout_ms: Optional[int] = None, session_timeout_ms: int = 10 * 1000, heartbeat_interval_ms: int = 3 * 1000, consumer_timeout_ms: int = 200, max_poll_records: Optional[int] = None, exclude_internal_topics: bool = True, isolation_level: Literal['read_uncommitted', 'read_committed'] = 'read_uncommitted', batch_timeout_ms: int = 200, max_records: Optional[int] = None, listener: Optional[ConsumerRebalanceListener] = None, pattern: Optional[str] = None, partitions: Iterable[TopicPartition] = (), dependencies: Iterable[Depends] = (), parser: Optional[CustomCallable] = None, decoder: Optional[CustomCallable] = None, middlewares: Sequence[SubscriberMiddleware[KafkaMessage]] = (), filter: Filter[KafkaMessage] = default_filter, retry: bool = False, no_ack: bool = False, no_reply: bool = False, title: Optional[str] = None, description: Optional[str] = None, include_in_schema: bool = True) -> Union[AsyncAPIDefaultSubscriber, AsyncAPIBatchSubscriber]
subscriber(*topics, batch=False, group_id=None, key_deserializer=None, value_deserializer=None, fetch_max_bytes=50 * 1024 * 1024, fetch_min_bytes=1, fetch_max_wait_ms=500, max_partition_fetch_bytes=1 * 1024 * 1024, auto_offset_reset='latest', auto_commit=True, auto_commit_interval_ms=5 * 1000, check_crcs=True, partition_assignment_strategy=(RoundRobinPartitionAssignor), max_poll_interval_ms=5 * 60 * 1000, rebalance_timeout_ms=None, session_timeout_ms=10 * 1000, heartbeat_interval_ms=3 * 1000, consumer_timeout_ms=200, max_poll_records=None, exclude_internal_topics=True, isolation_level='read_uncommitted', batch_timeout_ms=200, max_records=None, listener=None, pattern=None, partitions=(), dependencies=(), parser=None, decoder=None, middlewares=(), max_workers=1, filter=default_filter, retry=False, no_ack=False, no_reply=False, title=None, description=None, include_in_schema=True)
Source code in faststream/kafka/broker/registrator.py
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
@override
def subscriber(
    self,
    *topics: Annotated[
        str,
        Doc("Kafka topics to consume messages from."),
    ],
    batch: Annotated[
        bool,
        Doc("Whether to consume messages in batches or not."),
    ] = False,
    group_id: Annotated[
        Optional[str],
        Doc(
            """
        Name of the consumer group to join for dynamic
        partition assignment (if enabled), and to use for fetching and
        committing offsets. If `None`, auto-partition assignment (via
        group coordinator) and offset commits are disabled.
        """
        ),
    ] = None,
    key_deserializer: Annotated[
        Optional[Callable[[bytes], Any]],
        Doc(
            "Any callable that takes a raw message `bytes` "
            "key and returns a deserialized one."
        ),
    ] = None,
    value_deserializer: Annotated[
        Optional[Callable[[bytes], Any]],
        Doc(
            "Any callable that takes a raw message `bytes` "
            "value and returns a deserialized value."
        ),
    ] = None,
    fetch_max_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data the server should
        return for a fetch request. This is not an absolute maximum, if
        the first message in the first non-empty partition of the fetch
        is larger than this value, the message will still be returned
        to ensure that the consumer can make progress. NOTE: consumer
        performs fetches to multiple brokers in parallel so memory
        usage will depend on the number of brokers containing
        partitions for the topic.
        """
        ),
    ] = 50 * 1024 * 1024,
    fetch_min_bytes: Annotated[
        int,
        Doc(
            """
        Minimum amount of data the server should
        return for a fetch request, otherwise wait up to
        `fetch_max_wait_ms` for more data to accumulate.
        """
        ),
    ] = 1,
    fetch_max_wait_ms: Annotated[
        int,
        Doc(
            """
        The maximum amount of time in milliseconds
        the server will block before answering the fetch request if
        there isn't sufficient data to immediately satisfy the
        requirement given by `fetch_min_bytes`.
        """
        ),
    ] = 500,
    max_partition_fetch_bytes: Annotated[
        int,
        Doc(
            """
        The maximum amount of data
        per-partition the server will return. The maximum total memory
        used for a request ``= #partitions * max_partition_fetch_bytes``.
        This size must be at least as large as the maximum message size
        the server allows or else it is possible for the producer to
        send messages larger than the consumer can fetch. If that
        happens, the consumer can get stuck trying to fetch a large
        message on a certain partition.
        """
        ),
    ] = 1 * 1024 * 1024,
    auto_offset_reset: Annotated[
        Literal["latest", "earliest", "none"],
        Doc(
            """
        A policy for resetting offsets on `OffsetOutOfRangeError` errors:

        * `earliest` will move to the oldest available message
        * `latest` will move to the most recent
        * `none` will raise an exception so you can handle this case
        """
        ),
    ] = "latest",
    auto_commit: Annotated[
        bool,
        Doc(
            """
        If `True` the consumer's offset will be
        periodically committed in the background.
        """
        ),
    ] = True,
    auto_commit_interval_ms: Annotated[
        int,
        Doc(
            """
        Milliseconds between automatic
        offset commits, if `auto_commit` is `True`."""
        ),
    ] = 5 * 1000,
    check_crcs: Annotated[
        bool,
        Doc(
            """
        Automatically check the CRC32 of the records
        consumed. This ensures no on-the-wire or on-disk corruption to
        the messages occurred. This check adds some overhead, so it may
        be disabled in cases seeking extreme performance.
        """
        ),
    ] = True,
    partition_assignment_strategy: Annotated[
        Sequence["AbstractPartitionAssignor"],
        Doc(
            """
        List of objects to use to
        distribute partition ownership amongst consumer instances when
        group management is used. This preference is implicit in the order
        of the strategies in the list. When assignment strategy changes:
        to support a change to the assignment strategy, new versions must
        enable support both for the old assignment strategy and the new
        one. The coordinator will choose the old assignment strategy until
        all members have been updated. Then it will choose the new
        strategy.
        """
        ),
    ] = (RoundRobinPartitionAssignor,),
    max_poll_interval_ms: Annotated[
        int,
        Doc(
            """
        Maximum allowed time between calls to
        consume messages in batches. If this interval
        is exceeded the consumer is considered failed and the group will
        rebalance in order to reassign the partitions to another consumer
        group member. If API methods block waiting for messages, that time
        does not count against this timeout.
        """
        ),
    ] = 5 * 60 * 1000,
    rebalance_timeout_ms: Annotated[
        Optional[int],
        Doc(
            """
        The maximum time server will wait for this
        consumer to rejoin the group in a case of rebalance. In Java client
        this behaviour is bound to `max.poll.interval.ms` configuration,
        but as ``aiokafka`` will rejoin the group in the background, we
        decouple this setting to allow finer tuning by users that use
        `ConsumerRebalanceListener` to delay rebalacing. Defaults
        to ``session_timeout_ms``
        """
        ),
    ] = None,
    session_timeout_ms: Annotated[
        int,
        Doc(
            """
        Client group session and failure detection
        timeout. The consumer sends periodic heartbeats
        (`heartbeat.interval.ms`) to indicate its liveness to the broker.
        If no hearts are received by the broker for a group member within
        the session timeout, the broker will remove the consumer from the
        group and trigger a rebalance. The allowed range is configured with
        the **broker** configuration properties
        `group.min.session.timeout.ms` and `group.max.session.timeout.ms`.
        """
        ),
    ] = 10 * 1000,
    heartbeat_interval_ms: Annotated[
        int,
        Doc(
            """
        The expected time in milliseconds
        between heartbeats to the consumer coordinator when using
        Kafka's group management feature. Heartbeats are used to ensure
        that the consumer's session stays active and to facilitate
        rebalancing when new consumers join or leave the group. The
        value must be set lower than `session_timeout_ms`, but typically
        should be set no higher than 1/3 of that value. It can be
        adjusted even lower to control the expected time for normal
        rebalances.
        """
        ),
    ] = 3 * 1000,
    consumer_timeout_ms: Annotated[
        int,
        Doc(
            """
        Maximum wait timeout for background fetching
        routine. Mostly defines how fast the system will see rebalance and
        request new data for new partitions.
        """
        ),
    ] = 200,
    max_poll_records: Annotated[
        Optional[int],
        Doc(
            """
        The maximum number of records returned in a
        single call by batch consumer. Has no limit by default.
        """
        ),
    ] = None,
    exclude_internal_topics: Annotated[
        bool,
        Doc(
            """
        Whether records from internal topics
        (such as offsets) should be exposed to the consumer. If set to True
        the only way to receive records from an internal topic is
        subscribing to it.
        """
        ),
    ] = True,
    isolation_level: Annotated[
        Literal["read_uncommitted", "read_committed"],
        Doc(
            """
        Controls how to read messages written
        transactionally.

        * `read_committed`, batch consumer will only return
        transactional messages which have been committed.

        * `read_uncommitted` (the default), batch consumer will
        return all messages, even transactional messages which have been
        aborted.

        Non-transactional messages will be returned unconditionally in
        either mode.

        Messages will always be returned in offset order. Hence, in
        `read_committed` mode, batch consumer will only return
        messages up to the last stable offset (LSO), which is the one less
        than the offset of the first open transaction. In particular any
        messages appearing after messages belonging to ongoing transactions
        will be withheld until the relevant transaction has been completed.
        As a result, `read_committed` consumers will not be able to read up
        to the high watermark when there are in flight transactions.
        Further, when in `read_committed` the seek_to_end method will
        return the LSO. See method docs below.
        """
        ),
    ] = "read_uncommitted",
    batch_timeout_ms: Annotated[
        int,
        Doc(
            """
        Milliseconds spent waiting if
        data is not available in the buffer. If 0, returns immediately
        with any records that are available currently in the buffer,
        else returns empty.
        """
        ),
    ] = 200,
    max_records: Annotated[
        Optional[int],
        Doc("Number of messages to consume as one batch."),
    ] = None,
    listener: Annotated[
        Optional["ConsumerRebalanceListener"],
        Doc(
            """
        Optionally include listener
           callback, which will be called before and after each rebalance
           operation.
           As part of group management, the consumer will keep track of
           the list of consumers that belong to a particular group and
           will trigger a rebalance operation if one of the following
           events trigger:

           * Number of partitions change for any of the subscribed topics
           * Topic is created or deleted
           * An existing member of the consumer group dies
           * A new member is added to the consumer group

           When any of these events are triggered, the provided listener
           will be invoked first to indicate that the consumer's
           assignment has been revoked, and then again when the new
           assignment has been received. Note that this listener will
           immediately override any listener set in a previous call
           to subscribe. It is guaranteed, however, that the partitions
           revoked/assigned
           through this interface are from topics subscribed in this call.
        """
        ),
    ] = None,
    pattern: Annotated[
        Optional[str],
        Doc(
            """
        Pattern to match available topics. You must provide either topics or pattern, but not both.
        """
        ),
    ] = None,
    partitions: Annotated[
        Iterable["TopicPartition"],
        Doc(
            """
        An explicit partitions list to assign.
        You can't use 'topics' and 'partitions' in the same time.
        """
        ),
    ] = (),
    # broker args
    dependencies: Annotated[
        Iterable["Depends"],
        Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
    ] = (),
    parser: Annotated[
        Optional["CustomCallable"],
        Doc("Parser to map original **ConsumerRecord** object to FastStream one."),
    ] = None,
    decoder: Annotated[
        Optional["CustomCallable"],
        Doc("Function to decode FastStream msg bytes body to python objects."),
    ] = None,
    middlewares: Annotated[
        Sequence["SubscriberMiddleware[KafkaMessage]"],
        Doc("Subscriber middlewares to wrap incoming message processing."),
    ] = (),
    max_workers: Annotated[
        int,
        Doc("Number of workers to process messages concurrently."),
    ] = 1,
    filter: Annotated[
        "Filter[KafkaMessage]",
        Doc(
            "Overload subscriber to consume various messages from the same source."
        ),
        deprecated(
            "Deprecated in **FastStream 0.5.0**. "
            "Please, create `subscriber` object and use it explicitly instead. "
            "Argument will be removed in **FastStream 0.6.0**."
        ),
    ] = default_filter,
    retry: Annotated[
        bool,
        Doc("Whether to `nack` message at processing exception."),
    ] = False,
    no_ack: Annotated[
        bool,
        Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
    ] = False,
    no_reply: Annotated[
        bool,
        Doc(
            "Whether to disable **FastStream** RPC and Reply To auto responses or not."
        ),
    ] = False,
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI subscriber object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc(
            "AsyncAPI subscriber object description. "
            "Uses decorated docstring as default."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIDefaultSubscriber",
    "AsyncAPIBatchSubscriber",
    "AsyncAPIConcurrentDefaultSubscriber",
]:
    subscriber = super().subscriber(
        create_subscriber(
            *topics,
            batch=batch,
            max_workers=max_workers,
            batch_timeout_ms=batch_timeout_ms,
            max_records=max_records,
            group_id=group_id,
            listener=listener,
            pattern=pattern,
            connection_args={
                "key_deserializer": key_deserializer,
                "value_deserializer": value_deserializer,
                "fetch_max_wait_ms": fetch_max_wait_ms,
                "fetch_max_bytes": fetch_max_bytes,
                "fetch_min_bytes": fetch_min_bytes,
                "max_partition_fetch_bytes": max_partition_fetch_bytes,
                "auto_offset_reset": auto_offset_reset,
                "enable_auto_commit": auto_commit,
                "auto_commit_interval_ms": auto_commit_interval_ms,
                "check_crcs": check_crcs,
                "partition_assignment_strategy": partition_assignment_strategy,
                "max_poll_interval_ms": max_poll_interval_ms,
                "rebalance_timeout_ms": rebalance_timeout_ms,
                "session_timeout_ms": session_timeout_ms,
                "heartbeat_interval_ms": heartbeat_interval_ms,
                "consumer_timeout_ms": consumer_timeout_ms,
                "max_poll_records": max_poll_records,
                "exclude_internal_topics": exclude_internal_topics,
                "isolation_level": isolation_level,
            },
            partitions=partitions,
            is_manual=not auto_commit,
            # subscriber args
            no_ack=no_ack,
            no_reply=no_reply,
            retry=retry,
            broker_middlewares=self._middlewares,
            broker_dependencies=self._dependencies,
            # AsyncAPI
            title_=title,
            description_=description,
            include_in_schema=self._solve_include_in_schema(include_in_schema),
        )
    )

    if batch:
        return cast("AsyncAPIBatchSubscriber", subscriber).add_call(
            filter_=filter,
            parser_=parser or self._parser,
            decoder_=decoder or self._decoder,
            dependencies_=dependencies,
            middlewares_=middlewares,
        )

    else:
        if max_workers > 1:
            return cast("AsyncAPIConcurrentDefaultSubscriber", subscriber).add_call(
                filter_=filter,
                parser_=parser or self._parser,
                decoder_=decoder or self._decoder,
                dependencies_=dependencies,
                middlewares_=middlewares,
            )
        else:
            return cast("AsyncAPIDefaultSubscriber", subscriber).add_call(
                filter_=filter,
                parser_=parser or self._parser,
                decoder_=decoder or self._decoder,
                dependencies_=dependencies,
                middlewares_=middlewares,
            )

publisher #

publisher(topic: str, *, key: Union[bytes, Any, None] = None, partition: Optional[int] = None, headers: Optional[Dict[str, str]] = None, reply_to: str = '', batch: Literal[False] = False, middlewares: Sequence[PublisherMiddleware] = (), title: Optional[str] = None, description: Optional[str] = None, schema: Optional[Any] = None, include_in_schema: bool = True) -> AsyncAPIDefaultPublisher
publisher(topic: str, *, key: Union[bytes, Any, None] = None, partition: Optional[int] = None, headers: Optional[Dict[str, str]] = None, reply_to: str = '', batch: Literal[True], middlewares: Sequence[PublisherMiddleware] = (), title: Optional[str] = None, description: Optional[str] = None, schema: Optional[Any] = None, include_in_schema: bool = True) -> AsyncAPIBatchPublisher
publisher(topic: str, *, key: Union[bytes, Any, None] = None, partition: Optional[int] = None, headers: Optional[Dict[str, str]] = None, reply_to: str = '', batch: bool = False, middlewares: Sequence[PublisherMiddleware] = (), title: Optional[str] = None, description: Optional[str] = None, schema: Optional[Any] = None, include_in_schema: bool = True) -> Union[AsyncAPIBatchPublisher, AsyncAPIDefaultPublisher]
publisher(topic, *, key=None, partition=None, headers=None, reply_to='', batch=False, middlewares=(), title=None, description=None, schema=None, include_in_schema=True)

Creates long-living and AsyncAPI-documented publisher object.

You can use it as a handler decorator (handler should be decorated by @broker.subscriber(...) too) - @broker.publisher(...). In such case publisher will publish your handler return value.

Or you can create a publisher object to call it lately - broker.publisher(...).publish(...).

Source code in faststream/kafka/broker/registrator.py
@override
def publisher(
    self,
    topic: Annotated[
        str,
        Doc("Topic where the message will be published."),
    ],
    *,
    key: Annotated[
        Union[bytes, Any, None],
        Doc(
            """
        A key to associate with the message. Can be used to
        determine which partition to send the message to. If partition
        is `None` (and producer's partitioner config is left as default),
        then messages with the same key will be delivered to the same
        partition (but if key is `None`, partition is chosen randomly).
        Must be type `bytes`, or be serializable to bytes via configured
        `key_serializer`.
        """
        ),
    ] = None,
    partition: Annotated[
        Optional[int],
        Doc(
            """
        Specify a partition. If not set, the partition will be
        selected using the configured `partitioner`.
        """
        ),
    ] = None,
    headers: Annotated[
        Optional[Dict[str, str]],
        Doc(
            "Message headers to store metainformation. "
            "**content-type** and **correlation_id** will be set automatically by framework anyway. "
            "Can be overridden by `publish.headers` if specified."
        ),
    ] = None,
    reply_to: Annotated[
        str,
        Doc("Topic name to send response."),
    ] = "",
    batch: Annotated[
        bool,
        Doc("Whether to send messages in batches or not."),
    ] = False,
    # basic args
    middlewares: Annotated[
        Sequence["PublisherMiddleware"],
        Doc("Publisher middlewares to wrap outgoing messages."),
    ] = (),
    # AsyncAPI args
    title: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object title."),
    ] = None,
    description: Annotated[
        Optional[str],
        Doc("AsyncAPI publisher object description."),
    ] = None,
    schema: Annotated[
        Optional[Any],
        Doc(
            "AsyncAPI publishing message type. "
            "Should be any python-native object annotation or `pydantic.BaseModel`."
        ),
    ] = None,
    include_in_schema: Annotated[
        bool,
        Doc("Whetever to include operation in AsyncAPI schema or not."),
    ] = True,
) -> Union[
    "AsyncAPIBatchPublisher",
    "AsyncAPIDefaultPublisher",
]:
    """Creates long-living and AsyncAPI-documented publisher object.

    You can use it as a handler decorator (handler should be decorated by `@broker.subscriber(...)` too) - `@broker.publisher(...)`.
    In such case publisher will publish your handler return value.

    Or you can create a publisher object to call it lately - `broker.publisher(...).publish(...)`.
    """
    publisher = AsyncAPIPublisher.create(
        # batch flag
        batch=batch,
        # default args
        key=key,
        # both args
        topic=topic,
        partition=partition,
        headers=headers,
        reply_to=reply_to,
        # publisher-specific
        broker_middlewares=self._middlewares,
        middlewares=middlewares,
        # AsyncAPI
        title_=title,
        description_=description,
        schema_=schema,
        include_in_schema=self._solve_include_in_schema(include_in_schema),
    )

    if batch:
        return cast("AsyncAPIBatchPublisher", super().publisher(publisher))
    else:
        return cast("AsyncAPIDefaultPublisher", super().publisher(publisher))

include_router #

include_router(router, *, prefix='', dependencies=(), middlewares=(), include_in_schema=None)

Includes a router in the current object.

Source code in faststream/broker/core/abc.py
def include_router(
    self,
    router: "ABCBroker[Any]",
    *,
    prefix: str = "",
    dependencies: Iterable["Depends"] = (),
    middlewares: Iterable["BrokerMiddleware[MsgType]"] = (),
    include_in_schema: Optional[bool] = None,
) -> None:
    """Includes a router in the current object."""
    for h in router._subscribers.values():
        h.add_prefix("".join((self.prefix, prefix)))

        if (key := hash(h)) not in self._subscribers:
            if include_in_schema is None:
                h.include_in_schema = self._solve_include_in_schema(
                    h.include_in_schema
                )
            else:
                h.include_in_schema = include_in_schema

            h._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *h._broker_middlewares,
            )
            h._broker_dependencies = (
                *self._dependencies,
                *dependencies,
                *h._broker_dependencies,
            )
            self._subscribers = {**self._subscribers, key: h}

    for p in router._publishers.values():
        p.add_prefix(self.prefix)

        if (key := hash(p)) not in self._publishers:
            if include_in_schema is None:
                p.include_in_schema = self._solve_include_in_schema(
                    p.include_in_schema
                )
            else:
                p.include_in_schema = include_in_schema

            p._broker_middlewares = (
                *self._middlewares,
                *middlewares,
                *p._broker_middlewares,
            )
            self._publishers = {**self._publishers, key: p}

include_routers #

include_routers(*routers)

Includes routers in the object.

Source code in faststream/broker/core/abc.py
def include_routers(
    self,
    *routers: "ABCBroker[MsgType]",
) -> None:
    """Includes routers in the object."""
    for r in routers:
        self.include_router(r)