Skip to content

Release Notes#

0.5.15#

What's Changed#

Finally, FastStream has a Kafka pattern subscription! This is another step forward in our Roadmap moving us to 0.6.0 and futher!

from faststream import Path
from faststream.kafka import KafkaBroker

broker = KafkaBroker()

@broker.subscriber(pattern="logs.{level}")
async def base_handler(
    body: str,
    level: str = Path(),
):
    ...

Also, all brokers now supports a new ping method to check real broker connection

is_connected: bool = await broker.ping()

This is a little, but important change for K8S probes support

More other there are a lot of bugfixes and improvements from our contributors! Thanks to all of these amazing people!

  • feat(multiprocess): restart child processes if they are not alive by @gostilovichd in https://github.com/airtai/faststream/pull/1550
  • fix: use typing_extensions.TypedDict import by @Lancetnik in https://github.com/airtai/faststream/pull/1575
  • fix: correct single dataclass argument AsyncAPI payload generation by @Lancetnik in https://github.com/airtai/faststream/pull/1591
  • fix (#1598): use config with NATS PullSub by @Lancetnik in https://github.com/airtai/faststream/pull/1599
  • feat: default call_name for broker.subscriber by @KrySeyt in https://github.com/airtai/faststream/pull/1589
  • Feat: init ping method by @Flosckow in https://github.com/airtai/faststream/pull/1592
  • chore: bump nats-py requirement by @Lancetnik in https://github.com/airtai/faststream/pull/1600
  • fix: add pattern checking by @spataphore1337 in https://github.com/airtai/faststream/pull/1590

New Contributors#

  • @gostilovichd made their first contribution in https://github.com/airtai/faststream/pull/1550
  • @KrySeyt made their first contribution in https://github.com/airtai/faststream/pull/1589
  • @Flosckow made their first contribution in https://github.com/airtai/faststream/pull/1592

Full Changelog: https://github.com/airtai/faststream/compare/0.5.14...0.5.15

0.5.14#

What's Changed#

  • Update Release Notes for 0.5.13 by @faststream-release-notes-updater in #1548
  • Add allow_auto_create_topics to make automatic topic creation configurable by @kumaranvpl in #1556

Full Changelog: #0.5.13...0.5.14

0.5.13#

What's Changed#

New Contributors#

Full Changelog: #0.5.12...0.5.13

0.5.12#

What's Changed#

Now, FastStream provides users with the ability to pass the config dictionary to confluent-kafka-python for greater customizability. The following example sets the parameter topic.metadata.refresh.fast.interval.ms's value to 300 instead of the default value 100 via the config parameter.

from faststream import FastStream
from faststream.confluent import KafkaBroker

config = {"topic.metadata.refresh.fast.interval.ms": 300}
broker = KafkaBroker("localhost:9092", config=config)
app = FastStream(broker)
  • Update Release Notes for 0.5.11 by @faststream-release-notes-updater in #1511
  • docs: update filters example by @Lancetnik in #1516
  • Add config param to pass additional parameters to confluent-kafka-python by @kumaranvpl in #1505

Full Changelog: #0.5.11...0.5.12

0.5.11#

What's Changed#

New Contributors#

Full Changelog: #0.5.10...0.5.11

0.5.10#

What's Changed#

Now you can return Response class to set more specific outgoing message parameters:

from faststream import Response

@broker.subscriber("in")
@broker.subscriber("out")
async def handler():
    return Response(body=b"", headers={})

New Contributors#

Full Changelog: #0.5.9...0.5.10

0.5.9#

What's Changed#

  • Update Release Notes for 0.5.8 by @faststream-release-notes-updater in #1462
  • Exclude typing_extensions version 4.12.* by @kumaranvpl in #1467
  • fix: add group/consumer to hash to avoid overwriting by @fbraem in #1463
  • Bump version to 0.5.9 by @kumaranvpl in #1468

New Contributors#

Full Changelog: #0.5.8...0.5.9

0.5.8#

What's Changed#

This is the time for a new NATS features! FastStream supports NATS Key-Value and Object Storage subscribption features in a native way now (big thx for @sheldygg)!

  1. KeyValue creation and watching API added (you can read updated documentation section for changes):
 from faststream import FastStream, Logger
 from faststream.nats import NatsBroker

 broker = NatsBroker()
 app = FastStream(broker)

 @broker.subscriber("some-key", kv_watch="bucket")
 async def handler(msg: int, logger: Logger):
     logger.info(msg)

 @app.after_startup
 async def test():
     kv = await broker.key_value("bucket")
     await kv.put("some-key", b"1")
  1. ObjectStore API added as well (you can read updated documentation section for changes):

    from faststream import FastStream, Logger
    from faststream.nats import NatsBroker
    
    broker = NatsBroker()
    app = FastStream(broker)
    
    @broker.subscriber("file-bucket", obj_watch=True)
    async def handler(filename: str, logger: Logger):
        logger.info(filename)
    
    @app.after_startup
    async def test():
        object_store = await broker.object_storage("file-bucket")
        await object_store.put("some-file.txt", b"1")
    
  2. Also now you can use just pull_sub=True instead of pull_sub=PullSub() in basic case:

```python from faststream import FastStream, Logger from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream(broker)

@broker.subscriber("test", stream="stream", pull_sub=True)
async def handler(msg, logger: Logger):
    logger.info(msg)
```

Finally, we have a new feature, related to all brokers: special flag to suppress automatic RPC and reply_to responses:

@broker.subscriber("tests", no_reply=True)
async def handler():
    ....

# will fail with timeout, because there is no automatic response
msg = await broker.publish("msg", "test", rpc=True)
  • fix: when headers() returns None in AsyncConfluentParser, replace it with an empty tuple by @andreaimprovised in https://github.com/airtai/faststream/pull/1460
  • Implement Kv/Obj watch. by @sheldygg in https://github.com/airtai/faststream/pull/1383
  • feat: add subscriber no-reply option by @Lancetnik in https://github.com/airtai/faststream/pull/1461

New Contributors#

  • @andreaimprovised made their first contribution in https://github.com/airtai/faststream/pull/1460

Full Changelog: https://github.com/airtai/faststream/compare/0.5.7...0.5.8

0.5.7#

What's Changed#

Finally, FastStream supports OpenTelemetry in a native way to collect the full trace of your services! Big thanks for @draincoder for that!

First of all you need to install required dependencies to support OpenTelemetry:

pip install faststream[otel]

Then you can just add a middleware for your broker and that's it!

from faststream import FastStream
from faststream.nats import NatsBroker
from faststream.nats.opentelemetry import NatsTelemetryMiddleware

broker = NatsBroker(
    middlewares=(
        NatsTelemetryMiddleware(),
    )
)
app = FastStream(broker)

To find detailt information just visit our documentation about telemetry

P.S. The release includes basic OpenTelemetry support - messages tracing & basic metrics. Baggage support and correct spans linking in batch processing case will be added soon.

  • fix: serialize TestClient rpc output to mock the real message by @Lancetnik in https://github.com/airtai/faststream/pull/1452
  • feature (#916): Observability by @draincoder in https://github.com/airtai/faststream/pull/1398

New Contributors#

  • @draincoder made their first contribution in https://github.com/airtai/faststream/pull/1398

Full Changelog: https://github.com/airtai/faststream/compare/0.5.6...0.5.7

0.5.6#

What's Changed#

  • feature: add --factory param by @Sehat1137 in #1440
  • feat: add RMQ channels options, support for prefix for routing_key, a… by @Lancetnik in #1448
  • feature: Add from faststream.rabbit.annotations import Connection, Channel shortcuts
  • Bugfix: RabbitMQ RabbitRouter prefix now affects to queue routing key as well
  • Feature (close #1402): add broker.add_middleware public API to append a middleware to already created broker
  • Feature: add RabbitBroker(channel_number: int, publisher_confirms: bool, on_return_raises: bool) options to setup channel settings
  • Feature (close #1447): add StreamMessage.batch_headers attribute to provide with access to whole batch messages headers

New Contributors#

Full Changelog: #0.5.5...0.5.6

0.5.5#

What's Changed#

Add support for explicit partition assignment in aiokafka KafkaBroker (special thanks to @spataphore1337):

from faststream import FastStream
from faststream.kafka import KafkaBroker, TopicPartition

broker = KafkaBroker()

topic_partition_fisrt = TopicPartition("my_topic", 1)
topic_partition_second = TopicPartition("my_topic", 2)

@broker.subscribe(partitions=[topic_partition_fisrt, topic_partition_second])
async def some_consumer(msg):
   ...

Full Changelog: #0.5.4...0.5.5

0.5.4#

What's Changed#

  • Update Release Notes for 0.5.3 by @faststream-release-notes-updater in #1400
  • fix (#1415): raise SetupError if rpc and reply_to are using in TestCL… by @Lancetnik in #1419
  • Chore/update deps2 by @Lancetnik in #1418
  • refactor: correct security with kwarg params merging by @Lancetnik in #1417
  • fix (#1414): correct Message.ack error processing by @Lancetnik in #1420

Full Changelog: #0.5.3...0.5.4

0.5.3#

What's Changed#

Full Changelog: #0.5.2...0.5.3

0.5.2#

What's Changed#

Just a little bugfix patch. Fixes #1379 and #1376.

Full Changelog: #0.5.1...0.5.2

0.5.1#

What's Changed#

We already have some fixes related to RedisBroker (#1375, #1376) and some new features for you:

  1. Now broke.include_router(...) allows to pass some arguments to setup router at including moment instead of creation
broker.include_router(
   router,
   prefix="test_",
   dependencies=[Depends(...)],
   middlewares=[BrokerMiddleware],
   include_in_schema=False,
)
  1. KafkaBroker().subscriber(...) now consumes aiokafka.ConsumerRebalanceListener object. You can find more information about it in the official aiokafka doc

(close #1319)

broker = KafkaBroker()

broker.subscriber(..., listener=MyRebalancer())

pattern option was added too, but it is still experimental and does not support Path

  1. Path feature performance was increased. Also, Path is suitable for NATS PullSub batch subscription as well now.
from faststream import NatsBroker, PullSub

broker = NastBroker()

@broker.subscriber(
    "logs.{level}",
    steam="test-stream",
    pull_sub=PullSub(batch=True),
)
async def base_handler(
    ...,
    level: str = Path(),
):
  ...
  • Update Release Notes for 0.5.0 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1366
  • chore: bump version by @Lancetnik in https://github.com/airtai/faststream/pull/1372
  • feat: kafka listener, extended include_router by @Lancetnik in https://github.com/airtai/faststream/pull/1374
  • Fix/1375 by @Lancetnik in https://github.com/airtai/faststream/pull/1377

Full Changelog: https://github.com/airtai/faststream/compare/0.5.0...0.5.1

0.5.0#

What's Changed#

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...

@subscriber()
async def handler(msg: dict[str, Any]):
    ...
 ```

This is the preferred syntax for [filtering](https://faststream.airt.ai/latest/getting-started/subscription/filtering/) now (the old one will be removed in `0.6.0`)

 3. The `router.publisher()` function now returns the correct `Publisher` object you can use later (after broker startup).

 ```python
 publisher = router.publisher("test")

 @router.subscriber("in")
 async def handler():
     await publisher.publish("msg")
 ```

 (Until `0.5.0` you could use it in this way with `broker.publisher` only)

 4. A list of `middlewares` can be passed to a `broker.publisher` as well:

 ```python
 broker = Broker(..., middlewares=())

 @broker.subscriber(..., middlewares=())
 @broker.publisher(..., middlewares=())  # new feature
 async def handler():
     ...
 ```

5. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

6. ⚠️ BREAKING CHANGE ⚠️ : both `subscriber` and `publisher` middlewares should be async context manager type

```python
async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allow you two previously unavailable features: * suppress any exceptions and pass fall-back message body to publishers, and * patch any outgoing message headers and other parameters.

Without those features we could not implement Observability Middleware or any similar tool, so it is the job that just had to be done. 7. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  1. All .pyi files are removed, and explicit docstrings and methods options are added.

  2. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

  2. Update Release Notes for 0.4.7 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1295

  3. 1129 - Create a publish command for the CLI by @MRLab12 in https://github.com/airtai/faststream/pull/1151
  4. Chore: packages upgraded by @davorrunje in https://github.com/airtai/faststream/pull/1306
  5. docs: fix typos by @omahs in https://github.com/airtai/faststream/pull/1309
  6. chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1323
  7. docs: fix misc by @Lancetnik in https://github.com/airtai/faststream/pull/1324
  8. docs (#1327): correct RMQ exhcanges behavior by @Lancetnik in https://github.com/airtai/faststream/pull/1328
  9. fix: typer 0.12 exclude by @Lancetnik in https://github.com/airtai/faststream/pull/1341
  10. 0.5.0 by @Lancetnik in https://github.com/airtai/faststream/pull/1326
  11. close #1103
  12. close #840
  13. fix #690
  14. fix #1206
  15. fix #1227
  16. close #568
  17. close #1303
  18. close #1287
  19. feat #607
  20. Generate docs and linter fixes by @davorrunje in https://github.com/airtai/faststream/pull/1348
  21. Fix types by @davorrunje in https://github.com/airtai/faststream/pull/1349
  22. chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1358
  23. feat: final middlewares by @Lancetnik in https://github.com/airtai/faststream/pull/1357
  24. Docs/0.5.0 features by @Lancetnik in https://github.com/airtai/faststream/pull/1360

New Contributors#

  • @MRLab12 made their first contribution in https://github.com/airtai/faststream/pull/1151
  • @omahs made their first contribution in https://github.com/airtai/faststream/pull/1309

Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0

0.5.0rc2#

What's Changed#

This is the final API change before stable 0.5.0 release

⚠️ HAS BREAKING CHANGE

In it, we stabilize the behavior of publihsers & subscribers middlewares

async def subscriber_middleware(call_next, msg):
    return await call_next(msg)

async def publisher_middleware(call_next, msg, **kwargs):
    return await call_next(msg, **kwargs)

@broker.subscriber(
    "in",
    middlewares=(subscriber_middleware,),
)
@broker.publisher(
    "out",
    middlewares=(publisher_middleware,),
)
async def handler(msg):
    return msg

Such changes allows you two features previously unavailable

  • suppress any exceptions and pas fall-back message body to publishers
  • patch any outgoing message headers and other parameters

Without these features we just can't implement Observability Middleware or any similar tool, so it is the job to be done.

Now you are free to get access at any message processing stage and we are one step closer to the framework we would like to create!

  • Update Release Notes for 0.5.0rc0 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1347
  • Generate docs and linter fixes by @davorrunje in https://github.com/airtai/faststream/pull/1348
  • Fix types by @davorrunje in https://github.com/airtai/faststream/pull/1349
  • chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1358
  • feat: final middlewares by @Lancetnik in https://github.com/airtai/faststream/pull/1357

Full Changelog: https://github.com/airtai/faststream/compare/0.5.0rc0...0.5.0rc2

0.5.0rc0#

What's Changed#

This is the biggest change since the creation of FastStream. We have completely refactored the entire package, changing the object registration mechanism, message processing pipeline, and application lifecycle. However, you won't even notice it—we've preserved all public APIs from breaking changes. The only feature not compatible with the previous code is the new middleware.

This is still an RC (Release Candidate) for you to test before the stable release. You can manually install it in your project:

pip install faststream==0.5.0rc0

We look forward to your feedback!

New features:

  1. await FastStream.stop() method and StopApplication exception to stop a FastStream worker are added.

  2. broker.subscriber() and router.subscriber() functions now return a Subscriber object you can use later.

subscriber = broker.subscriber("test")

@subscriber(filter = lambda msg: msg.content_type == "application/json")
async def handler(msg: dict[str, Any]):
    ...

@subscriber()
async def handler(msg: dict[str, Any]):
    ...
 ```

This is the preferred syntax for [filtering](https://faststream.airt.ai/latest/getting-started/subscription/filtering/) now (the old one will be removed in `0.6.0`)

 3. The `router.publisher()` function now returns the correct `Publisher` object you can use later (after broker startup).

 ```python
 publisher = router.publisher("test")

 @router.subscriber("in")
 async def handler():
     await publisher.publish("msg")
 ```

 (Until `0.5.0` you could use it in this way with `broker.publisher` only)

 4. A list of `middlewares` can be passed to a `broker.publisher` as well:

 ```python
 broker = Broker(..., middlewares=())

 @broker.subscriber(..., middlewares=())
 @broker.publisher(..., middlewares=())  # new feature
 async def handler():
     ...
 ```

5. Broker-level middlewares now affect all ways to publish a message, so you can encode application outgoing messages here.

6. ⚠️ BREAKING CHANGE ⚠️ : both `subscriber` and `publisher` middlewares should be async context manager type

```python
from contextlib import asynccontextmanager

@asynccontextmanager
async def subscriber_middleware(msg_body):
    yield msg_body

@asynccontextmanager
async def publisher_middleware(
    msg_to_publish,
    **publish_arguments,
):
    yield msg_to_publish

@broker.subscriber("in", middlewares=(subscriber_middleware,))
@broker.publisher("out", middlewares=(publisher_middleware,))
async def handler():
    ...
  1. A better FastAPI compatibility: fastapi.BackgroundTasks and response_class subscriber option are supported.

  2. All .pyi files are removed, and explicit docstrings and methods options are added.

  3. New subscribers can be registered in runtime (with an already-started broker):

subscriber = broker.subscriber("dynamic")
subscriber(handler_method)
...
broker.setup_subscriber(subscriber)
await subscriber.start()
...
await subscriber.close()
  1. faststream[docs] distribution is removed.

  2. Update Release Notes for 0.4.7 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1295

  3. 1129 - Create a publish command for the CLI by @MRLab12 in https://github.com/airtai/faststream/pull/1151
  4. Chore: packages upgraded by @davorrunje in https://github.com/airtai/faststream/pull/1306
  5. docs: fix typos by @omahs in https://github.com/airtai/faststream/pull/1309
  6. chore: update dependencies by @Lancetnik in https://github.com/airtai/faststream/pull/1323
  7. docs: fix misc by @Lancetnik in https://github.com/airtai/faststream/pull/1324
  8. docs (#1327): correct RMQ exhcanges behavior by @Lancetnik in https://github.com/airtai/faststream/pull/1328
  9. fix: typer 0.12 exclude by @Lancetnik in https://github.com/airtai/faststream/pull/1341
  10. 0.5.0 by @Lancetnik in https://github.com/airtai/faststream/pull/1326
  11. close #1103
  12. close #840
  13. fix #690
  14. fix #1206
  15. fix #1227
  16. close #568
  17. close #1303
  18. close #1287
  19. feat #607

New Contributors#

  • @MRLab12 made their first contribution in https://github.com/airtai/faststream/pull/1151
  • @omahs made their first contribution in https://github.com/airtai/faststream/pull/1309

Full Changelog: https://github.com/airtai/faststream/compare/0.4.7...0.5.0rc0

0.4.7#

What's Changed#

  • Update Release Notes for 0.4.6 by @faststream-release-notes-updater in #1286
  • fix (#1263): correct nested descriminator msg type AsyncAPI schema by @Lancetnik in #1288
  • docs: add apply_types warning notice to subscription/index.md by @Lancetnik in #1291
  • chore: fixed nats-py version by @Lancetnik in #1294

Full Changelog: #0.4.6...0.4.7

0.4.6#

What's Changed#

Full Changelog: #0.4.5...0.4.6

0.4.5#

What's Changed#

  • Update Release Notes for 0.4.4 by @faststream-release-notes-updater in #1260
  • Removed unused pytest dependency from redis/schemas.py by @ashambalev in #1261
  • chore: bumped package versions by @davorrunje in #1270
  • fix (#1263): correct AsyncAPI schema in descriminator case by @Lancetnik in #1272

New Contributors#

Full Changelog: #0.4.4...0.4.5

0.4.4#

What's Changed#

Add RedisStream batch size option

@broker.subscriber(stream=StreamSub("input", batch=True, max_records=3))
async def on_input_data(msgs: list[str]):
    assert len(msgs) <= 3
  • Update Release Notes for 0.4.3 by @faststream-release-notes-updater in #1247
  • docs: add manual run section by @Lancetnik in #1249
  • feat (#1252): respect Redis StreamSub last_id with consumer group by @Lancetnik in #1256
  • fix: correct Redis consumer group behavior by @Lancetnik in #1258
  • feat: add Redis Stream max_records option by @Lancetnik in #1259

Full Changelog: #0.4.3...0.4.4

0.4.3#

What's Changed#

Allow to specify Redis Stream maxlen option in publisher:

@broker.publisher(stream=StreamSub("Output", max_len=10))
async def on_input_data():
    ....

Full Changelog: #0.4.2...0.4.3

0.4.2#

What's Changed#

Bug fixes#

Full Changelog: #0.4.1...0.4.2

0.4.1#

What's Changed#

Bug fixes#

Documentation#

Full Changelog: #0.4.0...0.4.1

0.4.0#

What's Changed#

This release adds support for the Confluent's Python Client for Apache Kafka (TM). Confluent's Python Client for Apache Kafka does not support natively async functions and its integration with modern async-based services is a bit trickier. That was the reason why our initial supported by Kafka broker used aiokafka. However, that choice was a less fortunate one as it is as well maintained as the Confluent version. After receiving numerous requests, we finally decided to bite the bullet and create an async wrapper around Confluent's Python Client and add full support for it in FastStream.

If you want to try it out, install it first with:

pip install "faststream[confluent]>=0.4.0"

To connect to Kafka using the FastStream KafkaBroker module, follow these steps:

  1. Initialize the KafkaBroker instance: Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address.

  2. Create your processing logic: Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic

  3. Decorate your processing function: To connect your processing function to the desired Kafka topics you need to decorate it with @broker.subscriber(...) and @broker.publisher(...) decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

For more information, please visit the documentation at:

https://faststream.airt.ai/latest/confluent/

List of Changes#

  • Update Release Notes for 0.3.13 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1119
  • docs: close #1125 by @Lancetnik in https://github.com/airtai/faststream/pull/1126
  • Add support for confluent python lib by @kumaranvpl in https://github.com/airtai/faststream/pull/1042
  • Update tutorial docs to include confluent code examples by @kumaranvpl in https://github.com/airtai/faststream/pull/1131
  • Add installation instructions for confluent by @kumaranvpl in https://github.com/airtai/faststream/pull/1132
  • Update Release Notes for 0.4.0rc0 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1130
  • chore: remove useless branch from CI by @Lancetnik in https://github.com/airtai/faststream/pull/1135
  • chore: bump mkdocs-git-revision-date-localized-plugin from 1.2.1 to 1.2.2 by @dependabot in https://github.com/airtai/faststream/pull/1140
  • chore: strict fast-depends version by @Lancetnik in https://github.com/airtai/faststream/pull/1145
  • chore: update copyright by @Lancetnik in https://github.com/airtai/faststream/pull/1144
  • fix: correct Windows shutdown by @Lancetnik in https://github.com/airtai/faststream/pull/1148
  • docs: fix typo by @saroz014 in https://github.com/airtai/faststream/pull/1154
  • Middleware Document Syntax Error by @SepehrBazyar in https://github.com/airtai/faststream/pull/1156
  • fix: correct FastAPI Context type hints by @Lancetnik in https://github.com/airtai/faststream/pull/1155
  • Fix bug which results in lost confluent coverage report by @kumaranvpl in https://github.com/airtai/faststream/pull/1160
  • Fix failing ack tests for confluent by @kumaranvpl in https://github.com/airtai/faststream/pull/1166
  • Update version to 0.4.0 and update docs by @kumaranvpl in https://github.com/airtai/faststream/pull/1171
  • feat #1180: add StreamRouter.on_broker_shutdown hook by @Lancetnik in https://github.com/airtai/faststream/pull/1182
  • Fix bug - using old upload-artifact version by @kumaranvpl in https://github.com/airtai/faststream/pull/1183
  • Release 0.4.0 by @davorrunje in https://github.com/airtai/faststream/pull/1184

New Contributors#

  • @saroz014 made their first contribution in https://github.com/airtai/faststream/pull/1154

Full Changelog: https://github.com/airtai/faststream/compare/0.3.13...0.4.0

0.4.0rc0#

What's Changed#

This is a preview version of 0.4.0 release introducing support for Confluent-based Kafka broker.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

Changes#

Full Changelog: #0.3.13...0.4.0rc0

0.3.13#

What's Changed#

New features#

Bug fixes#

  • Fix minor typos in documentation and code by @mj0nez in #1116

New Contributors#

Full Changelog: #0.3.12...0.3.13

0.3.12#

What's Changed#

Bug fixes#

  • fix (#1110): correct RMQ Topic pattern test publish by @Lancetnik in #1112

Misc#

Full Changelog: #0.3.11...0.3.12

0.3.11#

What's Changed#

NATS concurrent subscriber:

By default, NATS subscriber consumes messages with a block per subject. So, you can't process multiple messages from the same subject at the same time. But, with the broker.subscriber(..., max_workers=...) option, you can! It creates an async tasks pool to consume multiple messages from the same subject and allows you to process them concurrently!

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream()

@broker.subscriber("test-subject", max_workers=10)
async def handler(...):
   """Can process up to 10 messages in the same time."""
  • Update Release Notes for 0.3.10 by @faststream-release-notes-updater in #1091
  • fix (#1100): FastAPI 0.106 compatibility by @Lancetnik in #1102

Full Changelog: #0.3.10...0.3.11

0.3.10#

What's Changed#

New features#

Bug fixes#

  • fix (#1087): add app_dir option to docs serve/gen commands by @Lancetnik in #1088

Documentation#

Other#

Full Changelog: #0.3.9...0.3.10

0.3.9#

What's Changed#

Bug fixes:#

  • fix (#1082): correct NatsTestClient stream publisher by @Lancetnik in #1083

Chore:#

Full Changelog: #0.3.8...0.3.9

0.3.8#

What's Changed#

Full Changelog: #0.3.7...0.3.8

0.3.7#

What's Changed#

Support regular FastStream Context with FastAPI plugin

from fastapi import FastAPI
from faststream.redis.fastapi import RedisRouter, Logger

router = RedisRouter()

@router.subscriber("test")
async def handler(msg, logger: Logger):
    logger.info(msg)

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

Full Changelog: #0.3.6...0.3.7

0.3.6#

What's Changed#

  • chore: correct update release CI by @Lancetnik in #1050
  • Update Release Notes for main by @faststream-release-notes-updater in #1051
  • chore: fix building docs script by @davorrunje in #1055
  • 0.3.6 by @Lancetnik in #1056
  • bug: remove packaging dependency
  • bug: correct FastAPI batch consuming
  • docs: add search meta to all pages
  • docs: polish all pages styles, fix typos
  • chore: add ruff rule to check print

Full Changelog: #0.3.5...0.3.6

0.3.5#

What's Changed#

A large update by @Lancetnik in #1048

Provides with the ability to setup graceful_timeout to wait for consumed messages processed correctly before application shutdown - Broker(graceful_timeout=30.0) (waits up to 30 seconds)

  • allows to get access to context.get_local("message") from FastAPI plugin
  • docs: fix Avro custom serialization example
  • docs: add KafkaBroker publish_batch notice
  • docs: add RabbitMQ security page
  • fix: respect retry attempts with NackMessage exception
  • test Kafka nack and reject behavior
  • bug: fix import error with anyio version 4.x by @davorrunje in #1049

Full Changelog: #0.3.4...0.3.5

0.3.4#

What's Changed#

Features:#

Documentation#

Chore#

Full Changelog: #0.3.3...0.3.4

0.3.3#

What's Changed#

Features:

Chores:

Full Changelog: #0.3.2...0.3.3

0.3.2#

What's Changed#

New features:#

Chore:#

Full Changelog: #0.3.1...0.3.2

0.3.1#

What's Changed#

Features:

Bug fixes:

  • fix: non-payload information injected included in AsyncAPI docs by @Lancetnik in #1015

Documentation:

  • docs: fix misspelled FastDepends reference in README.md by @spectacularfailure in #1013

New Contributors#

  • @spectacularfailure made their first contribution in #1013

Full Changelog: #0.3.0...0.3.1

0.3.0#

What's Changed#

The main feature of the 0.3.0 release is added Redis support by @Lancetnik in #1003

You can install it by the following command:

pip install "faststream[redis]"

Here is a little code example

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)

@broker.subscriber(
    channel="test",  # or
    # list="test",     or
    # stream="test",
)
async def handle(msg: str, logger: Logger):
    logger.info(msg)

Other features#

  • feat: show reload directories with --reload flag by @Lancetnik in #981
  • feat: implement validate and no_ack subscriber options (#926) by @mihail8531 in #988
  • other features by @Lancetnik in #1003
    • Improve error logs (missing CLI arguments, undefined starting)
    • Add faststream docs serve --reload ... option for documentation hotreload
    • Add faststream run --reload-extension .env option to watch by changes in such files
    • Support faststream run -k 1 -k 2 ... as k=["1", "2"] extra options
    • Add subscriber, publisher and router include_in_schema: bool argument to disable AsyncAPI render
    • remove watchfiles from default distribution
    • Allow create broker.publisher(...) with already running broker
    • FastAPI-like lifespan FastStream application context manager
    • automatic TestBroker(connect_only=...) argument based on AST
    • add NatsMessage.in_progress() method

Testing#

Documentation#

Chore#

New Contributors#

Full Changelog: #0.2.15...0.3.0

0.3.0rc0#

What's Changed#

The main feature of the 0.3.x release is added Redis support by @Lancetnik in #1003

You can install it manually:

pip install faststream==0.3.0rc0 && pip install "faststream[redis]"

Other features#

  • feat: show reload directories with --reload flag by @Lancetnik in #981
  • Improve error logs (missing CLI arguments, undefined starting)
  • Add faststream docs serve --reload ... option for documentation hotreload
  • Add faststream run --reload-extension .env option to watch by changes in such files
  • Support faststream run -k 1 -k 2 ... as k=["1", "2"] extra options
  • Add subscriber, publisher and router include_in_schema: bool argument to disable AsyncAPI render
  • remove watchfiles from default distribution
  • Allow create @broker.publisher(...) with already running broker
  • FastAPI-like lifespan FastStream application context manager
  • automatic TestBroker(connect_only=...) argument based on AST
  • add NatsMessage.in_progress() method

Testing#

Documentation#

Chore#

New Contributors#

Full Changelog: #0.2.15...0.3.0rc0

0.2.15#

What's Changed#

Bug fixes#

Documentation#

Misc#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.14...0.2.15

0.2.14#

What's Changed#

Bug fixes#

Documentation#

Misc#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.13...0.2.14

0.2.13#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.12...0.2.13

0.2.12#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.11...0.2.12

0.2.11#

What's Changed#

Bug fixes#

Documentation#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.10...0.2.11

Documentation#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.10...0.2.11

0.2.10#

What's Changed#

Now, you can hide your connection secrets in the AsyncAPI schema by manually setting up the server URL:

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",  # Connection URL
    asyncapi_url="amqp://****:****@localhost:5672/",  # Public schema URL
)

Additionally, the RabbitMQ AsyncAPI schema has been improved, adding support for faststream.security, and the connection scheme is now defined automatically.

RabbitMQ connection parameters are now merged, allowing you to define the main connection data as a URL string and customize it using kwargs:

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",
    host="127.0.0.1",
)

# amqp://guest:guest@127.0.0.1:5672/ - The final URL
* A more suitable faststream.security import instead of faststream.broker.security * chore: add release notes for 0.2.9 by @kumaranvpl in https://github.com/airtai/faststream/pull/894 * chore: upgrade packages by @davorrunje in https://github.com/airtai/faststream/pull/901 * chore: use js redirect and redirect to version by @kumaranvpl in https://github.com/airtai/faststream/pull/902 * feat: add asyncapi_url broker arg by @Lancetnik in https://github.com/airtai/faststream/pull/903

Full Changelog: https://github.com/airtai/faststream/compare/0.2.9...0.2.10

0.2.9#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.8...0.2.9

0.2.8#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.7...0.2.8

0.2.7#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.6...0.2.7

0.2.6#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.5...0.2.6

0.2.5#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.4...0.2.5

0.2.4#

New Functionalities#

Now, Context provides access to inner dict keys too:

# headers is a `dict`
async def handler(
  user_id: int = Context("message.headers.user_id", cast=True),
): ...

Added Header object as a shortcut to Context("message.headers.") inner fields (NATS example):

# the same with the previous example
async def handler(
  user_id: int = Header(),
  u_id: int = Header("user_id"),  # with custom name
): ...

Added Path object to get access to NATS wildcard subject or RabbitMQ topic routing key (a shortcut to access Context("message.path.") as well):

@nats_broker.subscriber("logs.{level}")
async def handler(
  level: str = Path(),
)

Also, the original message Context annotation was copied from faststream.[broker].annotations.[Broker]Message to faststream.[broker].[Broker]Message to provide you with faster access to the most commonly used object (NATS example).

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.3...0.2.4

0.2.3#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.2...0.2.3

0.2.2#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.1...0.2.2

0.2.1#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.0...0.2.1

0.2.0#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.6...0.2.0

0.1.6#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.5...0.1.6

0.1.4#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.3...0.1.4

0.1.3#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.1...0.1.3

0.1.1#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/commits/0.1.1

0.1.0#

FastStream is a new package based on the ideas and experiences gained from FastKafka and Propan. By joining our forces, we picked up the best from both packages and created the unified way to write services capable of processing streamed data regardless of the underlying protocol. We'll continue to maintain both packages, but new development will be in this project. If you are starting a new service, this package is the recommended way to do it.

Features#

FastStream simplifies the process of writing producers and consumers for message queues, handling all the parsing, networking and documentation generation automatically.

Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use-cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices.

  • Multiple Brokers: FastStream provides a unified API to work across multiple message brokers (Kafka, RabbitMQ support)

  • Pydantic Validation: Leverage Pydantic's validation capabilities to serialize and validates incoming messages

  • Automatic Docs: Stay ahead with automatic AsyncAPI documentation.

  • Intuitive: full typed editor support makes your development experience smooth, catching errors before they reach runtime

  • Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in DI system.

  • Testable: supports in-memory tests, making your CI/CD pipeline faster and more reliable

  • Extendable: use extensions for lifespans, custom serialization and middlewares

  • Integrations: FastStream is fully compatible with any HTTP framework you want (FastAPI especially)

  • Built for Automatic Code Generation: FastStream is optimized for automatic code generation using advanced models like GPT and Llama

That's FastStream in a nutshell—easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered.