FastStream simplifies the process of writing producers and consumers for message queues, handling all the parsing, networking and documentation generation automatically.
Making streaming microservices has never been easier. Designed with junior developers in mind, FastStream simplifies your work while keeping the door open for more advanced use cases. Here's a look at the core features that make FastStream a go-to framework for modern, data-centric microservices.
Multiple Brokers: FastStream provides a unified API to work across multiple message brokers (Kafka [using AIOKafka & Confluent], RabbitMQ, NATS, Redis support)
Testable: Supports in-memory tests, making your CI/CD pipeline faster and more reliable
Extensible: Use extensions for lifespans, custom serialization and middleware
Integrations: FastStream is fully compatible with any HTTP framework you want (FastAPI especially)
That's FastStream in a nutshell—easy, efficient, and powerful. Whether you're just starting with streaming microservices or looking to scale, FastStream has got you covered.
FastStream is a new package based on the ideas and experiences gained from FastKafka and Propan. By joining our forces, we picked up the best from both packages and created a unified way to write services capable of processing streamed data regardless of the underlying protocol. We'll continue to maintain both packages, but new development will be in this project. If you are starting a new service, this package is the recommended way to do it.
FastStream works on Linux, macOS, Windows and most Unix-style operating systems. You can install it with pip as usual:
pipinstallfaststream[kafka]
pipinstallfaststream[confluent]
pipinstallfaststream[rabbit]
pipinstallfaststream[nats]
pipinstallfaststream[redis]
By default FastStream uses PydanticV2 written in Rust, but you can downgrade it manually, if your platform has no Rust support - FastStream will work correctly with PydanticV1 as well.
FastStream brokers provide convenient function decorators @broker.subscriber(...) and @broker.publisher(...) to allow you to delegate the actual process of:
consuming and producing data to Event queues, and
decoding and encoding JSON encoded messages
These decorators make it easy to specify the processing logic for your consumers and producers, allowing you to focus on the core business logic of your application without worrying about the underlying integration.
Also, FastStream uses Pydantic to parse input JSON-encoded data into Python objects, making it easy to work with structured data in your applications, so you can serialize your input messages just using type annotations.
Here is an example Python app using FastStream that consumes data from an incoming data stream and outputs the data to another one:
Also, Pydantic’s BaseModel class allows you to define messages using a declarative syntax, making it easy to specify the fields and types of your messages.
The service can be tested using the TestBroker context managers, which, by default, puts the Broker into "testing mode".
The Tester will redirect your subscriber and publisher decorated functions to the InMemory brokers, allowing you to quickly test your app without the need for a running broker and all its dependencies.
Using pytest, the test for our service would look like this:
The application can be started using the built-in FastStream CLI command.
To run the service, use the FastStream CLI command and pass the module (in this case, the file where the app implementation is located) and the app symbol to the command.
faststreamrunbasic:app
After running the command, you should see the following output:
FastStream automatically generates documentation for your project according to the AsyncAPI specification. You can work with both generated artifacts and place a web view of your documentation on resources available to related teams.
The availability of such documentation significantly simplifies the integration of services: you can immediately see what channels and message formats the application works with. And most importantly, it won't cost anything - FastStream has already created the docs for you!
FastStream (thanks to FastDepends) has a dependency management system similar to pytest fixtures and FastAPI Depends at the same time. Function arguments declare which dependencies you want are needed, and a special decorator delivers them from the global Context object.
importfalconimportfalcon.asgifromfaststream.kafkaimportKafkaBrokerbroker=KafkaBroker("localhost:9092")@broker.subscriber("test")asyncdefbase_handler(body):print(body)classThingsResource:asyncdefon_get(self,req,resp):resp.status=falcon.HTTP_200resp.content_type=falcon.MEDIA_TEXTresp.text=("\nTwo things awe me most, the starry sky ""above me and the moral law within me.\n""\n"" ~ Immanuel Kant\n\n")classStreamMiddleware:asyncdefprocess_startup(self,scope,event):awaitbroker.start()asyncdefprocess_shutdown(self,scope,event):awaitbroker.close()app=falcon.asgi.App()app.add_middleware(StreamMiddleware())app.add_route("/things",ThingsResource())
Just import a StreamRouter you need and declare the message handler with the same @router.subscriber(...) and @router.publisher(...) decorators.
Tip
When used this way, FastStream does not utilize its own dependency and serialization system but integrates seamlessly into FastAPI. This means you can use Depends, BackgroundTasks, and other FastAPI tools as if it were a regular HTTP endpoint.