Skip to content

Release Notes#

0.4.7#

What's Changed#

  • Update Release Notes for 0.4.6 by @faststream-release-notes-updater in #1286
  • fix (#1263): correct nested descriminator msg type AsyncAPI schema by @Lancetnik in #1288
  • docs: add apply_types warning notice to subscription/index.md by @Lancetnik in #1291
  • chore: fixed nats-py version by @Lancetnik in #1294

Full Changelog: #0.4.6...0.4.7

0.4.6#

What's Changed#

Full Changelog: #0.4.5...0.4.6

0.4.5#

What's Changed#

  • Update Release Notes for 0.4.4 by @faststream-release-notes-updater in #1260
  • Removed unused pytest dependency from redis/schemas.py by @ashambalev in #1261
  • chore: bumped package versions by @davorrunje in #1270
  • fix (#1263): correct AsyncAPI schema in descriminator case by @Lancetnik in #1272

New Contributors#

Full Changelog: #0.4.4...0.4.5

0.4.4#

What's Changed#

Add RedisStream batch size option

@broker.subscriber(stream=StreamSub("input", batch=True, max_records=3))
async def on_input_data(msgs: list[str]):
    assert len(msgs) <= 3
  • Update Release Notes for 0.4.3 by @faststream-release-notes-updater in #1247
  • docs: add manual run section by @Lancetnik in #1249
  • feat (#1252): respect Redis StreamSub last_id with consumer group by @Lancetnik in #1256
  • fix: correct Redis consumer group behavior by @Lancetnik in #1258
  • feat: add Redis Stream max_records option by @Lancetnik in #1259

Full Changelog: #0.4.3...0.4.4

0.4.3#

What's Changed#

Allow to specify Redis Stream maxlen option in publisher:

@broker.publisher(stream=StreamSub("Output", max_len=10))
async def on_input_data():
    ....

Full Changelog: #0.4.2...0.4.3

0.4.2#

What's Changed#

Bug fixes#

Full Changelog: #0.4.1...0.4.2

0.4.1#

What's Changed#

Bug fixes#

Documentation#

Full Changelog: #0.4.0...0.4.1

0.4.0#

What's Changed#

This release adds support for the Confluent's Python Client for Apache Kafka (TM). Confluent's Python Client for Apache Kafka does not support natively async functions and its integration with modern async-based services is a bit trickier. That was the reason why our initial supported by Kafka broker used aiokafka. However, that choice was a less fortunate one as it is as well maintained as the Confluent version. After receiving numerous requests, we finally decided to bite the bullet and create an async wrapper around Confluent's Python Client and add full support for it in FastStream.

If you want to try it out, install it first with:

pip install "faststream[confluent]>=0.4.0"

To connect to Kafka using the FastStream KafkaBroker module, follow these steps:

  1. Initialize the KafkaBroker instance: Start by initializing a KafkaBroker instance with the necessary configuration, including Kafka broker address.

  2. Create your processing logic: Write a function that will consume the incoming messages in the defined format and produce a response to the defined topic

  3. Decorate your processing function: To connect your processing function to the desired Kafka topics you need to decorate it with @broker.subscriber(...) and @broker.publisher(...) decorators. Now, after you start your application, your processing function will be called whenever a new message in the subscribed topic is available and produce the function return value to the topic defined in the publisher decorator.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

For more information, please visit the documentation at:

https://faststream.airt.ai/latest/confluent/

List of Changes#

  • Update Release Notes for 0.3.13 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1119
  • docs: close #1125 by @Lancetnik in https://github.com/airtai/faststream/pull/1126
  • Add support for confluent python lib by @kumaranvpl in https://github.com/airtai/faststream/pull/1042
  • Update tutorial docs to include confluent code examples by @kumaranvpl in https://github.com/airtai/faststream/pull/1131
  • Add installation instructions for confluent by @kumaranvpl in https://github.com/airtai/faststream/pull/1132
  • Update Release Notes for 0.4.0rc0 by @faststream-release-notes-updater in https://github.com/airtai/faststream/pull/1130
  • chore: remove useless branch from CI by @Lancetnik in https://github.com/airtai/faststream/pull/1135
  • chore: bump mkdocs-git-revision-date-localized-plugin from 1.2.1 to 1.2.2 by @dependabot in https://github.com/airtai/faststream/pull/1140
  • chore: strict fast-depends version by @Lancetnik in https://github.com/airtai/faststream/pull/1145
  • chore: update copyright by @Lancetnik in https://github.com/airtai/faststream/pull/1144
  • fix: correct Windows shutdown by @Lancetnik in https://github.com/airtai/faststream/pull/1148
  • docs: fix typo by @saroz014 in https://github.com/airtai/faststream/pull/1154
  • Middleware Document Syntax Error by @SepehrBazyar in https://github.com/airtai/faststream/pull/1156
  • fix: correct FastAPI Context type hints by @Lancetnik in https://github.com/airtai/faststream/pull/1155
  • Fix bug which results in lost confluent coverage report by @kumaranvpl in https://github.com/airtai/faststream/pull/1160
  • Fix failing ack tests for confluent by @kumaranvpl in https://github.com/airtai/faststream/pull/1166
  • Update version to 0.4.0 and update docs by @kumaranvpl in https://github.com/airtai/faststream/pull/1171
  • feat #1180: add StreamRouter.on_broker_shutdown hook by @Lancetnik in https://github.com/airtai/faststream/pull/1182
  • Fix bug - using old upload-artifact version by @kumaranvpl in https://github.com/airtai/faststream/pull/1183
  • Release 0.4.0 by @davorrunje in https://github.com/airtai/faststream/pull/1184

New Contributors#

  • @saroz014 made their first contribution in https://github.com/airtai/faststream/pull/1154

Full Changelog: https://github.com/airtai/faststream/compare/0.3.13...0.4.0

0.4.0rc0#

What's Changed#

This is a preview version of 0.4.0 release introducing support for Confluent-based Kafka broker.

Here's a simplified code example demonstrating how to establish a connection to Kafka using FastStream's KafkaBroker module:

from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

Changes#

Full Changelog: #0.3.13...0.4.0rc0

0.3.13#

What's Changed#

New features#

Bug fixes#

  • Fix minor typos in documentation and code by @mj0nez in #1116

New Contributors#

Full Changelog: #0.3.12...0.3.13

0.3.12#

What's Changed#

Bug fixes#

  • fix (#1110): correct RMQ Topic pattern test publish by @Lancetnik in #1112

Misc#

Full Changelog: #0.3.11...0.3.12

0.3.11#

What's Changed#

NATS concurrent subscriber:

By default, NATS subscriber consumes messages with a block per subject. So, you can't process multiple messages from the same subject at the same time. But, with the broker.subscriber(..., max_workers=...) option, you can! It creates an async tasks pool to consume multiple messages from the same subject and allows you to process them concurrently!

from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker()
app = FastStream()

@broker.subscriber("test-subject", max_workers=10)
async def handler(...):
   """Can process up to 10 messages in the same time."""
  • Update Release Notes for 0.3.10 by @faststream-release-notes-updater in #1091
  • fix (#1100): FastAPI 0.106 compatibility by @Lancetnik in #1102

Full Changelog: #0.3.10...0.3.11

0.3.10#

What's Changed#

New features#

Bug fixes#

  • fix (#1087): add app_dir option to docs serve/gen commands by @Lancetnik in #1088

Documentation#

Other#

Full Changelog: #0.3.9...0.3.10

0.3.9#

What's Changed#

Bug fixes:#

  • fix (#1082): correct NatsTestClient stream publisher by @Lancetnik in #1083

Chore:#

Full Changelog: #0.3.8...0.3.9

0.3.8#

What's Changed#

Full Changelog: #0.3.7...0.3.8

0.3.7#

What's Changed#

Support regular FastStream Context with FastAPI plugin

from fastapi import FastAPI
from faststream.redis.fastapi import RedisRouter, Logger

router = RedisRouter()

@router.subscriber("test")
async def handler(msg, logger: Logger):
    logger.info(msg)

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

Full Changelog: #0.3.6...0.3.7

0.3.6#

What's Changed#

  • chore: correct update release CI by @Lancetnik in #1050
  • Update Release Notes for main by @faststream-release-notes-updater in #1051
  • chore: fix building docs script by @davorrunje in #1055
  • 0.3.6 by @Lancetnik in #1056
  • bug: remove packaging dependency
  • bug: correct FastAPI batch consuming
  • docs: add search meta to all pages
  • docs: polish all pages styles, fix typos
  • chore: add ruff rule to check print

Full Changelog: #0.3.5...0.3.6

0.3.5#

What's Changed#

A large update by @Lancetnik in #1048

Provides with the ability to setup graceful_timeout to wait for consumed messages processed correctly before application shutdown - Broker(graceful_timeout=30.0) (waits up to 30 seconds)

  • allows to get access to context.get_local("message") from FastAPI plugin
  • docs: fix Avro custom serialization example
  • docs: add KafkaBroker publish_batch notice
  • docs: add RabbitMQ security page
  • fix: respect retry attempts with NackMessage exception
  • test Kafka nack and reject behavior
  • bug: fix import error with anyio version 4.x by @davorrunje in #1049

Full Changelog: #0.3.4...0.3.5

0.3.4#

What's Changed#

Features:#

Documentation#

Chore#

Full Changelog: #0.3.3...0.3.4

0.3.3#

What's Changed#

Features:

Chores:

Full Changelog: #0.3.2...0.3.3

0.3.2#

What's Changed#

New features:#

Chore:#

Full Changelog: #0.3.1...0.3.2

0.3.1#

What's Changed#

Features:

Bug fixes:

  • fix: non-payload information injected included in AsyncAPI docs by @Lancetnik in #1015

Documentation:

  • docs: fix misspelled FastDepends reference in README.md by @spectacularfailure in #1013

New Contributors#

  • @spectacularfailure made their first contribution in #1013

Full Changelog: #0.3.0...0.3.1

0.3.0#

What's Changed#

The main feature of the 0.3.0 release is added Redis support by @Lancetnik in #1003

You can install it by the following command:

pip install "faststream[redis]"

Here is a little code example

from faststream import FastStream, Logger
from faststream.redis import RedisBroker

broker = RedisBroker()
app = FastStream(broker)

@broker.subscriber(
    channel="test",  # or
    # list="test",     or
    # stream="test",
)
async def handle(msg: str, logger: Logger):
    logger.info(msg)

Other features#

  • feat: show reload directories with --reload flag by @Lancetnik in #981
  • feat: implement validate and no_ack subscriber options (#926) by @mihail8531 in #988
  • other features by @Lancetnik in #1003
    • Improve error logs (missing CLI arguments, undefined starting)
    • Add faststream docs serve --reload ... option for documentation hotreload
    • Add faststream run --reload-extension .env option to watch by changes in such files
    • Support faststream run -k 1 -k 2 ... as k=["1", "2"] extra options
    • Add subscriber, publisher and router include_in_schema: bool argument to disable AsyncAPI render
    • remove watchfiles from default distribution
    • Allow create broker.publisher(...) with already running broker
    • FastAPI-like lifespan FastStream application context manager
    • automatic TestBroker(connect_only=...) argument based on AST
    • add NatsMessage.in_progress() method

Testing#

Documentation#

Chore#

New Contributors#

Full Changelog: #0.2.15...0.3.0

0.3.0rc0#

What's Changed#

The main feature of the 0.3.x release is added Redis support by @Lancetnik in #1003

You can install it manually:

pip install faststream==0.3.0rc0 && pip install "faststream[redis]"

Other features#

  • feat: show reload directories with --reload flag by @Lancetnik in #981
  • Improve error logs (missing CLI arguments, undefined starting)
  • Add faststream docs serve --reload ... option for documentation hotreload
  • Add faststream run --reload-extension .env option to watch by changes in such files
  • Support faststream run -k 1 -k 2 ... as k=["1", "2"] extra options
  • Add subscriber, publisher and router include_in_schema: bool argument to disable AsyncAPI render
  • remove watchfiles from default distribution
  • Allow create @broker.publisher(...) with already running broker
  • FastAPI-like lifespan FastStream application context manager
  • automatic TestBroker(connect_only=...) argument based on AST
  • add NatsMessage.in_progress() method

Testing#

Documentation#

Chore#

New Contributors#

Full Changelog: #0.2.15...0.3.0rc0

0.2.15#

What's Changed#

Bug fixes#

Documentation#

Misc#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.14...0.2.15

0.2.14#

What's Changed#

Bug fixes#

Documentation#

Misc#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.13...0.2.14

0.2.13#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.12...0.2.13

0.2.12#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.11...0.2.12

0.2.11#

What's Changed#

Bug fixes#

Documentation#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.10...0.2.11

Documentation#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.10...0.2.11

0.2.10#

What's Changed#

Now, you can hide your connection secrets in the AsyncAPI schema by manually setting up the server URL:

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",  # Connection URL
    asyncapi_url="amqp://****:****@localhost:5672/",  # Public schema URL
)

Additionally, the RabbitMQ AsyncAPI schema has been improved, adding support for faststream.security, and the connection scheme is now defined automatically.

RabbitMQ connection parameters are now merged, allowing you to define the main connection data as a URL string and customize it using kwargs:

broker = RabbitBroker(
    "amqp://guest:guest@localhost:5672/",
    host="127.0.0.1",
)

# amqp://guest:guest@127.0.0.1:5672/ - The final URL
* A more suitable faststream.security import instead of faststream.broker.security * chore: add release notes for 0.2.9 by @kumaranvpl in https://github.com/airtai/faststream/pull/894 * chore: upgrade packages by @davorrunje in https://github.com/airtai/faststream/pull/901 * chore: use js redirect and redirect to version by @kumaranvpl in https://github.com/airtai/faststream/pull/902 * feat: add asyncapi_url broker arg by @Lancetnik in https://github.com/airtai/faststream/pull/903

Full Changelog: https://github.com/airtai/faststream/compare/0.2.9...0.2.10

0.2.9#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.8...0.2.9

0.2.8#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.7...0.2.8

0.2.7#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.6...0.2.7

0.2.6#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.5...0.2.6

0.2.5#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.4...0.2.5

0.2.4#

New Functionalities#

Now, Context provides access to inner dict keys too:

# headers is a `dict`
async def handler(
  user_id: int = Context("message.headers.user_id", cast=True),
): ...

Added Header object as a shortcut to Context("message.headers.") inner fields (NATS example):

# the same with the previous example
async def handler(
  user_id: int = Header(),
  u_id: int = Header("user_id"),  # with custom name
): ...

Added Path object to get access to NATS wildcard subject or RabbitMQ topic routing key (a shortcut to access Context("message.path.") as well):

@nats_broker.subscriber("logs.{level}")
async def handler(
  level: str = Path(),
)

Also, the original message Context annotation was copied from faststream.[broker].annotations.[Broker]Message to faststream.[broker].[Broker]Message to provide you with faster access to the most commonly used object (NATS example).

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.3...0.2.4

0.2.3#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.2...0.2.3

0.2.2#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.1...0.2.2

0.2.1#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.2.0...0.2.1

0.2.0#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.6...0.2.0

0.1.6#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.5...0.1.6

0.1.4#

What's Changed#

New Contributors#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.3...0.1.4

0.1.3#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/compare/0.1.1...0.1.3

0.1.1#

What's Changed#

Full Changelog: https://github.com/airtai/faststream/commits/0.1.1

0.1.0#

FastStream is a new package based on the ideas and experiences gained from FastKafka and Propan. By joining our forces, we picked up the best from both packages and created the unified way to write services capable of processing streamed data regardless of the underlying protocol. We'll continue to maintain both packages, but new development will be in this project. If you are starting a new service, this package is the recommended way to do it.

Features#

FastStream simplifies the process of writing producers and consumers for message queues, handling all the parsing, networking and documentation generation automatically.

Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use-cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices.

  • Multiple Brokers: FastStream provides a unified API to work across multiple message brokers (Kafka, RabbitMQ support)

  • Pydantic Validation: Leverage Pydantic's validation capabilities to serialize and validates incoming messages

  • Automatic Docs: Stay ahead with automatic AsyncAPI documentation.

  • Intuitive: full typed editor support makes your development experience smooth, catching errors before they reach runtime

  • Powerful Dependency Injection System: Manage your service dependencies efficiently with FastStream's built-in DI system.

  • Testable: supports in-memory tests, making your CI/CD pipeline faster and more reliable

  • Extendable: use extensions for lifespans, custom serialization and middlewares

  • Integrations: FastStream is fully compatible with any HTTP framework you want (FastAPI especially)

  • Built for Automatic Code Generation: FastStream is optimized for automatic code generation using advanced models like GPT and Llama

That's FastStream in a nutshell—easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered.