Skip to content

Release Notes#

0.5.22#

What's Changed#

Full Changelog: #0.5.21...0.5.22

0.5.21#

What's Changed#

  • feat (#1168): allow include regular router to FastAPI integration by @Lancetnik in #1747
  • In case if core-subscriber receive a JetStream message. by @sheldygg in #1751
  • feat: explicit final message commit status by @Lancetnik in #1754
  • Fix/context get local default by @Lancetnik in #1752
  • fix (#1759): correct ConfluentConfig with enums by @Lancetnik in #1762
  • Adds SASLOAuthBearer flow to AIO Kafka's Faststream Security Parsing by @sifex in #1761
  • fix: FastAPI 0.112.3 compatibility by @Lancetnik in #1763

Full Changelog: #0.5.20...0.5.21

0.5.20#

What's Changed#

Full Changelog: #0.5.19...0.5.20

0.5.19#

What's Changed#

The current release is planned as a latest feature release before 0.6.0. All other 0.5.19+ releases will contain only minor bugfixes and all the team work will be focused on next major one.

There's a lot of changes we want to present you now though!

New RPC feature#

Our old broker.publish(..., rpc=True) implementation was very limited and ugly. Now we present you a much suitable way to do the same thing - broker.request(...)

from faststream import FastStream
from faststream.nats import NatsBroker, NatsResponse, NatsMessage

broker = NatsBroker()

@broker.subscriber("test")
async def echo_handler(msg):
    return NatsResponse(msg, headers={"x-token": "some-token"})

@app.after_startup
async def test():
    # The old implementation was returning just a message body,
    # so you wasn't be able to check response headers, etc
    msg_body: str = await broker.publish("ping", "test", rpc=True)
    assert msg_body == "ping"

    # Now request return the whole message and you can validate any part of it
    # moreover it triggers all your middlewares
    response: NatsMessage = await broker.request("ping", "test")

Exception Middleware#

Community asked and community did! Sorry, we've been putting off this job for too long. Thanks for @Rusich90 to help us! Now you can wrap your application by a suitable exception handlers. Just check the new documentation to learn more.

Details#

Also, there are a lot of minor changes you can find below. Big thanks to all our old and new contributors! You are amazing ones!

  • Bug: resolve missing seek on kafka fakeconsumer by @JonathanSerafini in https://github.com/airtai/faststream/pull/1682
  • replace pip with uv in CI by @newonlynew in https://github.com/airtai/faststream/pull/1688
  • Added support for JSON serialization and deserialization by other libraries by @ulbwa in https://github.com/airtai/faststream/pull/1687
  • Fix batch nack by @kumaranvpl in https://github.com/airtai/faststream/pull/1689
  • Remove unused ignores by @kumaranvpl in https://github.com/airtai/faststream/pull/1690
  • docs: add Kafka HowTo section by @Lancetnik in https://github.com/airtai/faststream/pull/1686
  • Add missed out group_instance_id as subscriber and router parameter by @kumaranvpl in https://github.com/airtai/faststream/pull/1695
  • Set warn_unused_ignores mypy config to true by @kumaranvpl in https://github.com/airtai/faststream/pull/1694
  • Skip building docs in pre-commit CI job by @kumaranvpl in https://github.com/airtai/faststream/pull/1704
  • Fix to run check-docs-changes workflow in forks by @kumaranvpl in https://github.com/airtai/faststream/pull/1710
  • feature/exception_middleware add exception middleware by @Rusich90 in https://github.com/airtai/faststream/pull/1604
  • Remove mentions of faststream-gen by @kumaranvpl in https://github.com/airtai/faststream/pull/1717
  • Fix multiple docs issues by @kumaranvpl in https://github.com/airtai/faststream/pull/1718
  • CI: group Dependabot updates into one PR by @dolfinus in https://github.com/airtai/faststream/pull/1719
  • feat: CLI DX improvements by @Lancetnik in https://github.com/airtai/faststream/pull/1723
  • fix: use async test subscribers functions by @Lancetnik in https://github.com/airtai/faststream/pull/1725
  • feat: add broker.request method by @Lancetnik in https://github.com/airtai/faststream/pull/1649

New Contributors#

  • @JonathanSerafini made their first contribution in https://github.com/airtai/faststream/pull/1682
  • @Rusich90 made their first contribution in https://github.com/airtai/faststream/pull/1604
  • @dolfinus made their first contribution in https://github.com/airtai/faststream/pull/1719

Full Changelog: https://github.com/airtai/faststream/compare/0.5.18...0.5.19

0.5.18#

What's Changed#

New Contributors#

  • @ulbwa made their first contribution in #1659
  • @Kirill-Stepankov made their first contribution in #1662

Full Changelog: #0.5.17...0.5.18

0.5.17#

What's Changed#

Just a hotfix for the following case:

@broker.subscriber(...)
async def handler():
    return NatsResponse(...)

await broker.publish(..., rpc=True)
  • chore(deps): bump semgrep from 1.83.0 to 1.84.0 by @dependabot in #1650
  • chore(deps): bump mkdocs-material from 9.5.30 to 9.5.31 by @dependabot in #1651
  • Update Release Notes for 0.5.16 by @faststream-release-notes-updater in #1652
  • hotfix: correct NatsResponse processing in RPC case by @Lancetnik in #1654

Full Changelog: #0.5.16...0.5.17

0.5.16#

What's Changed#

Well, seems like it is the biggest patch release ever 😃

Detail Responses#

First of all, thanks to all new contributors, who helps us to improve the project! They made a huge impact to this release by adding new Kafka security mechanisms and extend Response API - now you can use broker.Response to publish detail information from handler

@broker.subscriber("in")
@broker.publisher("out")
async def handler(msg):
    return Response(msg, headers={"response_header": "Hi!"})   # or KafkaResponse, etc

ASGI#

Also, we added a new huge feature - ASGI support!

Nope, we are not HTTP-framework now, but it is a little ASGI implementation to provide you with an ability to host documentation, use k8s http-probes and serve metrics in the same with you broker runtime without any dependencies.

You just need to use AsgiFastStream class

from faststream.nats import NatsBroker
from faststream.asgi import AsgiFastStream, make_ping_asgi

from prometheus_client import make_asgi_app
from prometheus_client.registry import CollectorRegistry

broker = NatsBroker()

prometheus_registry = CollectorRegistry()

app = AsgiFastStream(
    broker,
    asyncapi_path="/docs",
    asgi_routes=[
        ("/health", make_ping_asgi(broker, timeout=5.0)),
        ("/metrics", make_asgi_app(registry=prometheus_registry))
    ]
)

And then you can run it like a regular ASGI app

uvicorn main:app

Confluent partitions#

One more thing - manual topic partition assignment for Confluent. We have it already for aiokafka, but missed it here... Now it was fixed!

from faststream.confluent import TopicPartition

@broker.subscriber(partitions=[
    TopicPartition("test-topic", partition=0),
])
async def handler():
    ...

Detail changes#

  • feat: add RMQ fail_fast option in #1647
  • fix: correct nested NatsRouter subjects prefixes behavior
  • fix typos by @newonlynew in https://github.com/airtai/faststream/pull/1609
  • Feat: extend response api by @Flosckow in https://github.com/airtai/faststream/pull/1607
  • Feature: GSSAPI (Kerberos) support by @roma-frolov in https://github.com/airtai/faststream/pull/1633
  • feat: add oauth support by @filip-danieluk in https://github.com/airtai/faststream/pull/1632
  • fix: patch broker within testbroker context only by @sfran96 in https://github.com/airtai/faststream/pull/1619
  • feat: ASGI support by @Lancetnik in https://github.com/airtai/faststream/pull/1635

New Contributors#

  • @newonlynew made their first contribution in https://github.com/airtai/faststream/pull/1609
  • @roma-frolov made their first contribution in https://github.com/airtai/faststream/pull/1633
  • @filip-danieluk made their first contribution in https://github.com/airtai/faststream/pull/1632
  • @sfran96 made their first contribution in https://github.com/airtai/faststream/pull/1619

Full Changelog: https://github.com/airtai/faststream/compare/0.5.15...0.5.16

0.5.15#

What's Changed#

Finally, FastStream has a Kafka pattern subscription! This is another step forward in our Roadmap moving us to 0.6.0 and further!

from faststream import Path
from faststream.kafka import KafkaBroker

broker = KafkaBroker()

@broker.subscriber(pattern="logs.{level}")
async def base_handler(
    body: str,
    level: str = Path(),
):
    ...

Also, all brokers now supports a new ping method to check real broker connection

is_connected: bool = await broker.ping()

This is a little, but important change for K8S probes support

More other there are a lot of bugfixes and improvements from our contributors! Thanks to all of these amazing people!

  • feat(multiprocess): restart child processes if they are not alive by @gostilovichd in https://github.com/airtai/faststream/pull/1550
  • fix: use typing_extensions.TypedDict import by @Lancetnik in https://github.com/airtai/faststream/pull/1575
  • fix: correct single dataclass argument AsyncAPI payload generation by @Lancetnik in https://github.com/airtai/faststream/pull/1591
  • fix (#1598): use config with NATS PullSub by @Lancetnik in https://github.com/airtai/faststream/pull/1599
  • feat: default call_name for broker.subscriber by @KrySeyt in https://github.com/airtai/faststream/pull/1589
  • Feat: init ping method by @Flosckow in https://github.com/airtai/faststream/pull/1592
  • chore: bump nats-py requirement by @Lancetnik in https://github.com/airtai/faststream/pull/1600
  • fix: add pattern checking by @spataphore1337 in https://github.com/airtai/faststream/pull/1590

New Contributors#

  • @gostilovichd made their first contribution in https://github.com/airtai/faststream/pull/1550
  • @KrySeyt made their first contribution in https://github.com/airtai/faststream/pull/1589
  • @Flosckow made their first contribution in https://github.com/airtai/faststream/pull/1592

Full Changelog: https://github.com/airtai/faststream/compare/0.5.14...0.5.15

0.5.14#

What's Changed#

  • Update Release Notes for 0.5.13 by @faststream-release-notes-updater in #1548
  • Add allow_auto_create_topics to make automatic topic creation configurable by @kumaranvpl in #1556

Full Changelog: #0.5.13...0.5.14

0.5.13#

What's Changed#

New Contributors#

Full Changelog: #0.5.12...0.5.13

0.5.12#

What's Changed#

Now, FastStream provides users with the ability to pass the config dictionary to confluent-kafka-python for greater customizability. The following example sets the parameter topic.metadata.refresh.fast.interval.ms's value to 300 instead of the default value 100 via the config parameter.

from faststream import FastStream
from faststream.confluent import KafkaBroker

config = {"topic.metadata.refresh.fast.interval.ms": 300}
broker = KafkaBroker("localhost:9092", config=config)
app = FastStream(broker)
  • Update Release Notes for 0.5.11 by @faststream-release-notes-updater in #1511
  • docs: update filters example by @Lancetnik in #1516
  • Add config param to pass additional parameters to confluent-kafka-python by @kumaranvpl in #1505

Full Changelog: #0.5.11...0.5.12

0.5.11#

What's Changed#

New Contributors#

Full Changelog: #0.5.10...0.5.11

0.5.10#

What's Changed#

Now you can return Response class to set more specific outgoing message parameters:

from faststream import Response

@broker.subscriber("in")
@broker.subscriber("out")
async def handler():
    return Response(body=b"", headers={})

New Contributors#

Full Changelog: #0.5.9...0.5.10

0.5.9#

What's Changed#

  • Update Release Notes for 0.5.8 by @faststream-release-notes-updater in #1462
  • Exclude typing_extensions version 4.12.* by @kumaranvpl in #1467
  • fix: add group/consumer to hash to avoid overwriting by @fbraem in #1463
  • Bump version to 0.5.9 by @kumaranvpl in #1468

New Contributors#

Full Changelog: #0.5.8...0.5.9

0.5.8#

What's Changed#

This is the time for a new NATS features! FastStream supports NATS Key-Value and Object Storage subscription features in a native way now (big thx for @sheldygg)!

  1. KeyValue creation and watching API added (you can read updated documentation section for changes):
 from faststream import FastStream, Logger
 from faststream.nats import NatsBroker

 broker = NatsBroker()
 app = FastStream(broker)

 @broker.subscriber("some-key", kv_watch="bucket")
 async def handler(msg: int, logger: Logger):
     logger.info(msg)

 @app.after_startup
 async def test():
     kv = await broker.key_value("bucket")
     await kv.put("some-key", b"1")
  1. ObjectStore API added as well (you can read updated documentation section for changes):

    from faststream import FastStream, Logger
    from faststream.nats import NatsBroker
    
    broker = NatsBroker()
    app = FastStream(broker)
    
    @broker.subscriber("file-bucket", obj_watch=True)
    async def handler(filename: str, logger: Logger):
        logger.info(filename)
    
    @app.after_startup
    async def test():
        object_store = await broker.object_storage("file-bucket")
        await object_store.put("some-file.txt", b"1")
    
  2. Also now you can use just pull_sub=True instead of pull_sub=PullSub() in basic case:

```python from faststream import FastStream, Logger from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test", stream="stream", pull_sub=True)
async def handler(msg, logger: Logger):
    logger.info(msg)
```

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

@broker.subscriber("tests", no_reply=True)
async def handler():
    ....

# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)
  • fix: when headers() returns None in AsyncConfluentParser, replace it with an empty tuple by @andreaimprovised in https://github.com/airtai/faststream/pull/1460
  • Implement Kv/Obj watch. by @sheldygg in https://github.com/airtai/faststream/pull/1383
  • feat: add subscriber no-reply option by @Lancetnik in https://github.com/airtai/faststream/pull/1461

New Contributors#

  • @andreaimprovised made their first contribution in https://github.com/airtai/faststream/pull/1460

Full Changelog: https://github.com/airtai/faststream/compare/0.5.7...0.5.8

0.5.7#

What's Changed#

Finally, FastStream supports OpenTelemetry in a native way to collect the full trace of your services! Big thanks for @draincoder for that!

First of all you need to install required dependencies to support OpenTelemetry:

pip install faststream[otel]

Then you can just add a middleware for your broker and that's it!

from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.opentelemetry import NatsTelemetryMiddleware

broker = NatsBroker(
    middlewares=(
        NatsTelemetryMiddleware(),
    )
)
app = FastStream(broker)

To find detail information just visit our documentation about telemetry

P.S. The release includes basic OpenTelemetry support - messages tracing & basic metrics. Baggage support and correct spans linking in batch processing case will be added soon.

  • fix: serialize TestClient rpc output to mock the real message by @Lancetnik in https://github.com/airtai/faststream/pull/1452
  • feature (#916): Observability by @draincoder in https://github.com/airtai/faststream/pull/1398

New Contributors#

  • @draincoder made their first contribution in https://github.com/airtai/faststream/pull/1398

Full Changelog: https://github.com/airtai/faststream/compare/0.5.6...0.5.7

0.5.6#

What's Changed#

  • feature: add --factory param by @Sehat1137 in #1440
  • feat: add RMQ channels options, support for prefix for routing_key, a… by @Lancetnik in #1448
  • feature: Add from faststream.rabbit.annotations import Connection, Channel shortcuts
  • Bugfix: RabbitMQ RabbitRouter prefix now affects to queue routing key as well
  • Feature (close #1402): add broker.add_middleware public API to append a middleware to already created broker
  • Feature: add RabbitBroker(channel_number: int, publisher_confirms: bool, on_return_raises: bool) options to setup channel settings
  • Feature (close #1447): add StreamMessage.batch_headers attribute to provide with access to whole batch messages headers

New Contributors#

Full Changelog: #0.5.5...0.5.6

0.5.5#

What's Changed#

Add support for explicit partition assignment in aiokafka KafkaBroker (special thanks to @spataphore1337):

from faststream import FastStream
from faststream.kafka import KafkaBroker, TopicPartition

broker = KafkaBroker()

topic_partition_first = TopicPartition("my_topic", 1)
topic_partition_second = TopicPartition("my_topic", 2)

@broker.subscribe(partitions=[topic_partition_first, topic_partition_second])
async def some_consumer(msg):
   ...

Full Changelog: #0.5.4...0.5.5

0.5.4#

What's Changed#

  • Update Release Notes for 0.5.3 by @faststream-release-notes-updater in #1400
  • fix (#1415): raise SetupError if rpc and reply_to are using in TestCL… by @Lancetnik in #1419
  • Chore/update deps2 by @Lancetnik in #1418
  • refactor: correct security with kwarg params merging by @Lancetnik in #1417
  • fix (#1414): correct Message.ack error processing by @Lancetnik in #1420

Full Changelog: #0.5.3...0.5.4

0.5.3#

What's Changed#

Full Changelog: #0.5.2...0.5.3

0.5.2#

What's Changed#

Just a little bugfix patch. Fixes #1379 and #1376.

Full Changelog: #0.5.1...0.5.2

0.5.1#

What's Changed#

We already have some fixes related to RedisBroker (#1375, #1376) and some new features for you:

  1. Now broke.include_router(...) allows to pass some arguments to setup router at including moment instead of creation
broker.include_router(
   router,
   prefix="test_",
   dependencies=[Depends(...)],
   middlewares=[BrokerMiddleware],
   include_in_schema=False,
)
  1. KafkaBroker().subscriber(...) now consumes aiokafka.ConsumerRebalanceListener object. You can find more information about it in the official aiokafka doc

(close #1319)

broker = KafkaBroker()

broker.subscriber(..., listener=MyRebalancer())

pattern option was added too, but it is still experimental and does not support Path

  1. Path feature performance was increased. Also, Path is suitable for NATS PullSub batch subscription as well now.
from faststream import NatsBroker, PullSub

broker = NastBroker()

@broker.subscriber(
    "logs.{level}",
    steam="test-stream",
    pull_sub=PullSub(batch=True),
)
async def base_handler(
    ...,
    level: str = Path(),
):
  ...
  • Update Release Notes for 0.5.0 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1366
  • chore: bump version by @Lancetnik in https://github.com/airtai/faststream/pull/1372
  • feat: kafka listener, extended include_router by @Lancetnik in https://github.com/airtai/faststream/pull/1374
  • Fix/1375 by @Lancetnik in https://github.com/airtai/faststream/pull/1377

Full Changelog: https://github.com/airtai/faststream/compare/0.5.0...0.5.1

0.5.0#

What's Changed#

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...

@subscriber()
async def handler(msg: dict[str, Any]):
    ...
 ```

This is the preferred syntax for [filtering](https://faststream.airt.ai/latest/getting-started/subscription/filtering/) now (the old one will be removed in `0.6.0`)

 3. The `router.publisher()` function now returns the correct `Publisher` object you can use later (after broker startup).

 ```python
 publisher = router.publisher("test")

 @router.subscriber("in")
 async def handler():
     await publisher.publish("msg")
 ```

 (Until `0.5.0` you could use it in this way with `broker.publisher` only)

 4. A list of `middlewares` can be passed to a `broker.publisher` as well:

 ```python
 broker = Broker(..., middlewares=())

 @broker.subscriber(..., middlewares=())
 @broker.publisher(..., middlewares=())  # new feature
 async def handler():
     ...
 ```

5. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

6. ⚠️ BREAKING CHANGE ⚠️ : both `subscriber` and `publisher` middlewares should be async context manager type

```python
async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allow you two previously unavailable features: * suppress any exceptions and pass fall-back message body to publishers, and * patch any outgoing message headers and other parameters.

Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done. 7. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  1. All .pyi files are removed, and explicit docstrings and methods options are added.

  2. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

  2. Update Release Notes for 0.4.7 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1295

  3. 1129 - Create a publish command for the CLI by @MRLab12 in https://github.com/airtai/faststream/pull/1151
  4. Chore: packages upgraded by @davorrunje in https://github.com/airtai/faststream/pull/1306
  5. docs: fix typos by @omahs in https://github.com/airtai/faststream/pull/1309
  6. chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1323
  7. docs: fix misc by @Lancetnik in https://github.com/airtai/faststream/pull/1324
  8. docs (#1327): correct RMQ exchanges behavior by @Lancetnik in https://github.com/airtai/faststream/pull/1328
  9. fix: typer 0.12 exclude by @Lancetnik in https://github.com/airtai/faststream/pull/1341
  10. 0.5.0 by @Lancetnik in https://github.com/airtai/faststream/pull/1326
  11. close #1103
  12. close #840
  13. fix #690
  14. fix #1206
  15. fix #1227
  16. close #568
  17. close #1303
  18. close #1287
  19. feat #607
  20. Generate docs and linter fixes by @davorrunje in https://github.com/airtai/faststream/pull/1348
  21. Fix types by @davorrunje in https://github.com/airtai/faststream/pull/1349
  22. chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1358
  23. feat: final middlewares by @Lancetnik in https://github.com/airtai/faststream/pull/1357
  24. Docs/0.5.0 features by @Lancetnik in https://github.com/airtai/faststream/pull/1360

New Contributors#

  • @MRLab12 made their first contribution in https://github.com/airtai/faststream/pull/1151
  • @omahs made their first contribution in https://github.com/airtai/faststream/pull/1309

Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0

0.5.0rc2#

What's Changed#

This is the final API change before stable 0.5.0 release

⚠️ HAS BREAKING CHANGE

In it, we stabilize the behavior of publishers & subscribers middlewares

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allows you two features previously unavailable

  • suppress any exceptions and pas fall-back message body to publishers
  • patch any outgoing message headers and other parameters

Without these features we just can't implement Observability Middleware or any similar tool, so it is the job to be done.

Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!

  • Update Release Notes for 0.5.0rc0 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1347
  • Generate docs and linter fixes by @davorrunje in https://github.com/airtai/faststream/pull/1348
  • Fix types by @davorrunje in https://github.com/airtai/faststream/pull/1349
  • chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1358
  • feat: final middlewares by @Lancetnik in https://github.com/airtai/faststream/pull/1357

Full Changelog: https://github.com/airtai/faststream/compare/0.5.0rc0...0.5.0rc2

0.5.0rc0#

What's Changed#

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

This is still an RC (Release Candidate) for you to test before the stable release. You can manually install it in your project:

pip install faststream==0.5.0rc0

We look forward to your feedback!

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...

@subscriber()
async def handler(msg: dict[str, Any]):
    ...
 ```

This is the preferred syntax for [filtering](https://faststream.airt.ai/latest/getting-started/subscription/filtering/) now (the old one will be removed in `0.6.0`)

 3. The `router.publisher()` function now returns the correct `Publisher` object you can use later (after broker startup).

 ```python
 publisher = router.publisher("test")

 @router.subscriber("in")
 async def handler():
     await publisher.publish("msg")
 ```

 (Until `0.5.0` you could use it in this way with `broker.publisher` only)

 4. A list of `middlewares` can be passed to a `broker.publisher` as well:

 ```python
 broker = Broker(..., middlewares=())

 @broker.subscriber(..., middlewares=())
 @broker.publisher(..., middlewares=())  # new feature
 async def handler():
     ...
 ```

5. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

6. ⚠️ BREAKING CHANGE ⚠️ : both `subscriber` and `publisher` middlewares should be async context manager type

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def subscriber_middleware(msg_body):
    yield msg_body

@asynccontextmanager
async def publisher_middleware(
    msg_to_publish,
    **publish_arguments,
):
    yield msg_to_publish

@broker.subscriber("in", middlewares=(subscriber_middleware,))
@broker.publisher("out", middlewares=(publisher_middleware,))
async def handler():
    ...
  1. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  2. All .pyi files are removed, and explicit docstrings and methods options are added.

  3. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

  2. Update Release Notes for 0.4.7 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1295

  3. 1129 - Create a publish command for the CLI by @MRLab12 in https://github.com/airtai/faststream/pull/1151
  4. Chore: packages upgraded by @davorrunje in https://github.com/airtai/faststream/pull/1306
  5. docs: fix typos by @omahs in https://github.com/airtai/faststream/pull/1309
  6. chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1323
  7. docs: fix misc by @Lancetnik in https://github.com/airtai/faststream/pull/1324
  8. docs (#1327): correct RMQ exchanges behavior by @Lancetnik in https://github.com/airtai/faststream/pull/1328
  9. fix: typer 0.12 exclude by @Lancetnik in https://github.com/airtai/faststream/pull/1341
  10. 0.5.0 by @Lancetnik in https://github.com/airtai/faststream/pull/1326
  11. close #1103
  12. close #840
  13. fix #690
  14. fix #1206
  15. fix #1227
  16. close #568
  17. close #1303
  18. close #1287
  19. feat #607

New Contributors#

  • @MRLab12 made their first contribution in https://github.com/airtai/faststream/pull/1151
  • @omahs made their first contribution in https://github.com/airtai/faststream/pull/1309

Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0rc0

0.4.7#

What's Changed#

  • Update Release Notes for 0.4.6 by @faststream-release-notes-updater in #1286
  • fix (#1263): correct nested discriminator msg type AsyncAPI schema by @Lancetnik in #1288
  • docs: add apply_types warning notice to subscription/index.md by @Lancetnik in #1291
  • chore: fixed nats-py version by @Lancetnik in #1294

Full Changelog: #0.4.6...0.4.7

0.4.6#

What's Changed#

Full Changelog: #0.4.5...0.4.6

0.4.5#

What's Changed#

  • Update Release Notes for 0.4.4 by @faststream-release-notes-updater in #1260
  • Removed unused pytest dependency from redis/schemas.py by @ashambalev in #1261
  • chore: bumped package versions by @davorrunje in #1270
  • fix (#1263): correct AsyncAPI schema in discriminator case by @Lancetnik in #1272

New Contributors#

Full Changelog: #0.4.4...0.4.5

0.4.4#

What's Changed#

Add RedisStream batch size option

@broker.subscriber(stream=StreamSub("input", batch=True, max_records=3))
async def on_input_data(msgs: list[str]):
    assert len(msgs) <= 3
  • Update Release Notes for 0.4.3 by @faststream-release-notes-updater in #1247
  • docs: add manual run section by @Lancetnik in #1249
  • feat (#1252): respect Redis StreamSub last_id with consumer group by @Lancetnik in #1256
  • fix: correct Redis consumer group behavior by @Lancetnik in #1258
  • feat: add Redis Stream max_records option by @Lancetnik in #1259

Full Changelog: #0.4.3...0.4.4

0.4.3#

What's Changed#

Allow to specify Redis Stream maxlen option in publisher:

@broker.publisher(stream=StreamSub("Output", max_len=10))
async def on_input_data():
    ....

Full Changelog: #0.4.2...0.4.3

0.4.2#

What's Changed#

Bug fixes#

Full Changelog: #0.4.1...0.4.2

0.4.1#

What's Changed#

Bug fixes#

Documentation#

Full Changelog: #0.4.0...0.4.1

0.4.0#

What's Changed#

This release adds support for the Confluent's Python Client for Apache Kafka (TM). Confluent's Python Client for Apache Kafka does not support natively async functions and its integration with modern async-based services is a bit trickier. That was the reason why our initial supported by Kafka broker used aiokafka. However, that choice was a less fortunate one as it is as well maintained as the Confluent version. After receiving numerous requests, we finally decided to bite the bullet and create an async wrapper around Confluent's Python Client and add full support for it in FastStream.

If you want to try it out, install it first with:

pip install "faststream[confluent]>=0.4.0"

To connect to Kafka using the FastStream KafkaBroker module, follow these steps:

  1. Initialize the KafkaBroker instance: Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address.

  2. Create your processing logic: Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic

  3. Decorate your processing function: To connect your processing function to the desired Kafka topics you need to decorate it with @broker.subscriber(...) and @broker.publisher(...) decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

For more information, please visit the documentation at:

https://faststream.airt.ai/latest/confluent/

List of Changes#

  • Update Release Notes for 0.3.13 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1119
  • docs: close #1125 by @Lancetnik in https://github.com/airtai/faststream/pull/1126
  • Add support for confluent python lib by @kumaranvpl in https://github.com/airtai/faststream/pull/1042
  • Update tutorial docs to include confluent code examples by @kumaranvpl in https://github.com/airtai/faststream/pull/1131
  • Add installation instructions for confluent by @kumaranvpl in https://github.com/airtai/faststream/pull/1132
  • Update Release Notes for 0.4.0rc0 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1130
  • chore: remove useless branch from CI by @Lancetnik in https://github.com/airtai/faststream/pull/1135
  • chore: bump mkdocs-git-revision-date-localized-plugin from 1.2.1 to 1.2.2 by @dependabot in https://github.com/airtai/faststream/pull/1140
  • chore: strict fast-depends version by @Lancetnik in https://github.com/airtai/faststream/pull/1145
  • chore: update copyright by @Lancetnik in https://github.com/airtai/faststream/pull/1144
  • fix: correct Windows shutdown by @Lancetnik in https://github.com/airtai/faststream/pull/1148
  • docs: fix typo by @saroz014 in https://github.com/airtai/faststream/pull/1154
  • Middleware Document Syntax Error by @SepehrBazyar in https://github.com/airtai/faststream/pull/1156
  • fix: correct FastAPI Context type hints by @Lancetnik in https://github.com/airtai/faststream/pull/1155
  • Fix bug which results in lost confluent coverage report by @kumaranvpl in https://github.com/airtai/faststream/pull/1160
  • Fix failing ack tests for confluent by @kumaranvpl in https://github.com/airtai/faststream/pull/1166
  • Update version to 0.4.0 and update docs by @kumaranvpl in https://github.com/airtai/faststream/pull/1171
  • feat #1180: add StreamRouter.on_broker_shutdown hook by @Lancetnik in https://github.com/airtai/faststream/pull/1182
  • Fix bug - using old upload-artifact version by @kumaranvpl in https://github.com/airtai/faststream/pull/1183
  • Release 0.4.0 by @davorrunje in https://github.com/airtai/faststream/pull/1184

New Contributors#

  • @saroz014 made their first contribution in https://github.com/airtai/faststream/pull/1154

Full Changelog: https://github.com/airtai/faststream/compare/0.3.13...0.4.0

0.4.0rc0#

What's Changed#

This is a preview version of 0.4.0 release introducing support for Confluent-based Kafka broker.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

Changes#

Full Changelog: #0.3.13...0.4.0rc0

0.3.13#

What's Changed#

New features#

Bug fixes#

  • Fix minor typos in documentation and code by @mj0nez in #1116

New Contributors#

Full Changelog: #0.3.12...0.3.13

0.3.12#

What's Changed#

Bug fixes#

  • fix (#1110): correct RMQ Topic pattern test publish by @Lancetnik in #1112

Misc#

Full Changelog: #0.3.11...0.3.12

0.3.11#

What's Changed#

NATS concurrent subscriber:

By default, NATS subscriber consumes messages with a block per subject. So, you can't process multiple messages from the same subject at the same time. But, with the broker.subscriber(..., max_workers=...) option, you can! It creates an async tasks pool to consume multiple messages from the same subject and allows you to process them concurrently!

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream()

@broker.subscriber("test-subject", max_workers=10)
async def handler(...):
   """Can process up to 10 messages in the same time."""
  • Update Release Notes for 0.3.10 by @faststream-release-notes-updater in #1091
  • fix (#1100): FastAPI 0.106 compatibility by @Lancetnik in #1102

Full Changelog: #0.3.10...0.3.11

0.3.10#

What's Changed#

New features#

Bug fixes#

  • fix (#1087): add app_dir option to docs serve/gen commands by @Lancetnik in #1088

Documentation#

Other#

Full Changelog: #0.3.9...0.3.10

0.3.9#

What's Changed#

Bug fixes:#

  • fix (#1082): correct NatsTestClient stream publisher by @Lancetnik in #1083

Chore:#

Full Changelog: #0.3.8...0.3.9

0.3.8#

What's Changed#

Full Changelog: #0.3.7...0.3.8

0.3.7#

What's Changed#

Support regular FastStream Context with FastAPI plugin

from fastapi import FastAPI
from faststream.redis.fastapi import RedisRouter, Logger

router = RedisRouter()

@router.subscriber("test")
async def handler(msg, logger: Logger):
    logger.info(msg)

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

Full Changelog: #0.3.6...0.3.7

0.3.6#

What's Changed#

  • chore: correct update release CI by @Lancetnik in #1050
  • Update Release Notes for main by @faststream-release-notes-updater in #1051
  • chore: fix building docs script by @davorrunje in #1055
  • 0.3.6 by @Lancetnik in #1056
  • bug: remove packaging dependency
  • bug: correct FastAPI batch consuming
  • docs: add search meta to all pages
  • docs: polish all pages styles, fix typos
  • chore: add ruff rule to check print

Full Changelog: #0.3.5...0.3.6

0.3.5#

What's Changed#

A large update by @Lancetnik in #1048

Provides with the ability to setup graceful_timeout to wait for consumed messages processed correctly before application shutdown - Broker(graceful_timeout=30.0) (waits up to 30 seconds)

  • allows to get access to context.get_local("message") from FastAPI plugin
  • docs: fix Avro custom serialization example
  • docs: add KafkaBroker publish_batch notice
  • docs: add RabbitMQ security page
  • fix: respect retry attempts with NackMessage exception
  • test Kafka nack and reject behavior
  • bug: fix import error with anyio version 4.x by @davorrunje in #1049

Full Changelog: #0.3.4...0.3.5

0.3.4#

What's Changed#

Features:#

Documentation#

Chore#

Full Changelog: #0.3.3...0.3.4

0.3.3#

What's Changed#

Features:

Chores:

Full Changelog: #0.3.2...0.3.3

0.3.2#

What's Changed#

New features:#

Chore:#

Full Changelog: #0.3.1...0.3.2

0.3.1#

What's Changed#

Features:

Bug fixes:

  • fix: non-payload information injected included in AsyncAPI docs by @Lancetnik in #1015

Documentation:

  • docs: fix misspelled FastDepends reference in README.md by @spectacularfailure in #1013

New Contributors#

  • @spectacularfailure made their first contribution in #1013

Full Changelog: #0.3.0...0.3.1

0.3.0#

What's Changed#

The main feature of the 0.3.0 release is added Redis support by @Lancetnik in #1003

You can install it by the following command:

pip install "faststream[redis]"

Here is a little code example

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)

@broker.subscriber(
    channel="test",  # or
    # list="test",     or
    # stream="test",
)
async def handle(msg: str, logger: Logger):
    logger.info(msg)

Other features#

  • feat: show reload directories with --reload flag by @Lancetnik in #981
  • feat: implement validate and no_ack subscriber options (#926) by @mihail8531 in #988
  • other features by @Lancetnik in #1003
    • Improve error logs (missing CLI arguments, undefined starting)
    • Add faststream docs serve --reload ... option for documentation hotreload
    • Add faststream run --reload-extension .env option to watch by changes in such files
    • Support faststream run -k 1 -k 2 ... as k=["1", "2"] extra options
    • Add subscriber, publisher and router include_in_schema: bool argument to disable AsyncAPI render
    • remove watchfiles from default distribution
    • Allow create broker.publisher(...) with already running broker
    • FastAPI-like lifespan FastStream application context manager
    • automatic TestBroker(connect_only=...) argument based on AST
    • add NatsMessage.in_progress() method

Testing#

Documentation#

Chore#

New Contributors#

Full Changelog: #0.2.15...0.3.0

0.3.0rc0#

What's Changed#

The main feature of the 0.3.x release is added Redis support by @Lancetnik in #1003

You can install it manually:

pip install faststream==0.3.0rc0 && pip install "faststream[redis]"

Other features#

  • feat: show reload directories with --reload flag by @Lancetnik in #981
  • Improve error logs (missing CLI arguments, undefined starting)
  • Add faststream docs serve --reload ... option for documentation hotreload
  • Add faststream run --reload-extension .env option to watch by changes in such files
  • Support faststream run -k 1 -k 2 ... as k=["1", "2"] extra options
  • Add subscriber, publisher and router include_in_schema: bool argument to disable AsyncAPI render
  • remove watchfiles from default distribution
  • Allow create @broker.publisher(...) with already running broker
  • FastAPI-like lifespan FastStream application context manager
  • automatic TestBroker(connect_only=...) argument based on AST
  • add NatsMessage.in_progress() method

Testing#

Documentation#

Chore#

New Contributors#

Full Changelog: #0.2.15...0.3.0rc0

0.2.15#

What's Changed#

Bug fixes#

Documentation#

Misc#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.14...0.2.15

0.2.14#

What's Changed#

Bug fixes#

Documentation#

Misc#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.13...0.2.14

0.2.13#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.12...0.2.13

0.2.12#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.11...0.2.12

0.2.11#

What's Changed#

Bug fixes#

Documentation#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.10...0.2.11

Documentation#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.10...0.2.11

0.2.10#

What's Changed#

Now, you can hide your connection secrets in the AsyncAPI schema by manually setting up the server URL:

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",  # Connection URL
    asyncapi_url="amqp://****:****@localhost:5672/",  # Public schema URL
)

Additionally, the RabbitMQ AsyncAPI schema has been improved, adding support for faststream.security, and the connection scheme is now defined automatically.

RabbitMQ connection parameters are now merged, allowing you to define the main connection data as a URL string and customize it using kwargs:

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",
    host="127.0.0.1",
)

# amqp://guest:guest@127.0.0.1:5672/ - The final URL
* A more suitable faststream.security import instead of faststream.broker.security * chore: add release notes for 0.2.9 by @kumaranvpl in https://github.com/airtai/faststream/pull/894 * chore: upgrade packages by @davorrunje in https://github.com/airtai/faststream/pull/901 * chore: use js redirect and redirect to version by @kumaranvpl in https://github.com/airtai/faststream/pull/902 * feat: add asyncapi_url broker arg by @Lancetnik in https://github.com/airtai/faststream/pull/903

Full Changelog: https://github.com/airtai/faststream/compare/0.2.9...0.2.10

0.2.9#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.8...0.2.9

0.2.8#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.7...0.2.8

0.2.7#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.6...0.2.7

0.2.6#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.5...0.2.6

0.2.5#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.4...0.2.5

0.2.4#

New Functionalities#

Now, Context provides access to inner dict keys too:

# headers is a `dict`
async def handler(
  user_id: int = Context("message.headers.user_id", cast=True),
): ...

Added Header object as a shortcut to Context("message.headers.") inner fields (NATS example):

# the same with the previous example
async def handler(
  user_id: int = Header(),
  u_id: int = Header("user_id"),  # with custom name
): ...

Added Path object to get access to NATS wildcard subject or RabbitMQ topic routing key (a shortcut to access Context("message.path.") as well):

@nats_broker.subscriber("logs.{level}")
async def handler(
  level: str = Path(),
)

Also, the original message Context annotation was copied from faststream.[broker].annotations.[Broker]Message to faststream.[broker].[Broker]Message to provide you with faster access to the most commonly used object (NATS example).

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.3...0.2.4

0.2.3#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.2...0.2.3

0.2.2#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.1...0.2.2

0.2.1#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.0...0.2.1

0.2.0#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.6...0.2.0

0.1.6#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.5...0.1.6

0.1.4#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.3...0.1.4

0.1.3#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.1...0.1.3

0.1.1#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/commits/0.1.1

0.1.0#

FastStream is a new package based on the ideas and experiences gained from FastKafka and Propan. By joining our forces, we picked up the best from both packages and created the unified way to write services capable of processing streamed data regardless of the underlying protocol. We'll continue to maintain both packages, but new development will be in this project. If you are starting a new service, this package is the recommended way to do it.

Features#

FastStream simplifies the process of writing producers and consumers for message queues, handling all the parsing, networking and documentation generation automatically.

Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use-cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices.

  • Multiple Brokers: FastStream provides a unified API to work across multiple message brokers (Kafka, RabbitMQ support)

  • Pydantic Validation: Leverage Pydantic's validation capabilities to serialize and validates incoming messages

  • Automatic Docs: Stay ahead with automatic AsyncAPI documentation.

  • Intuitive: full typed editor support makes your development experience smooth, catching errors before they reach runtime

  • Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in DI system.

  • Testable: supports in-memory tests, making your CI/CD pipeline faster and more reliable

  • Extendable: use extensions for lifespans, custom serialization and middlewares

  • Integrations: FastStream is fully compatible with any HTTP framework you want (FastAPI especially)

  • Built for Automatic Code Generation: FastStream is optimized for automatic code generation using advanced models like GPT and Llama

That's FastStream in a nutshell—easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered.