FastStream is a framework for asynchronous service development. It allows you to build disturbed event-based systems in an easy way. Tasks scheduling is a pretty often usecase in such systems.
Unfortunatelly, this functional conflicts with the original FastStream ideology and can't be implemented as a part of the framework. But, you can integrate scheduling in your FastStream application by using some extra dependencies. And we have some reciepts how to make it.
Taskiq is an asynchronous distributed task queue for python. This project takes inspiration from big projects such as Celery and Dramatiq.
As a Celery replacement, Taskiq should support tasks sheduling and delayied publishing, of course. And it does!
By the way, you can easely integrate FastStream with the Taskiq. It allows you to create cron or delayied tasks to publish messages and trigger some functions this way.
We have a helpful project to provide you with this feature - Taskiq-FastStream.
It has two hepfull classes
AppWrapper to make your FastStream App and Broker objects taskiq-compatible.
Let's take a look at the code example.
At first, we should create a regular FastStream application.
Now, if you want to make it just working, we should wrap our
Broker to special
It creates a taskiq-compatible object, that can be used as an object to create a regular taskiq scheduler.
broker.task(...) has the same with the original
broker.publish(...) signature and allows you to plan your publishing tasks usign the great taskiq
schedule option (you can learn more about it here).
Finally, to run the scheduler, please use the taskiq CLI command:
If you don't wont to lost application AsyncAPI schema or/and lifespans, you can wrap not the broker, but application by itself using
It allows you to use
taskiq_broker the same way with the previous example, but saves all original FastStream features.
Creating a separated Scheduler service is a best way to make really disturbed and susteinable system. In this case, you can just create an empty FastStream broker and use Taskiq-FastStream integration to publish your messages (consuming by another services).
Generate message payload#
Also, you able to determine message payload right before sending and do not use the final one. To make it, just replace
message option from the final value to function (sync or async), that returns data to send:
It allows you to collect some data from database, request an outer API, or use another ways to generate data to send right before sending.
More than, you can send not one, but multiple messages per one task using this feature. Just turn your message callback function to generator (sync or async) - and Taskiq-FastStream will iterates over your payload and publishes all of your messages!
Also, you can integrate your FastStream application with any other libraries provides you with a scheduling functional.
As an example, you can use Rocketry:
import asyncio from rocketry import Rocketry from rocketry.args import Arg from faststream.nats import NatsBroker app = Rocketry(execution="async") broker = NatsBroker() # regular broker app.params(broker=broker) async def start_app(): async with broker: # connect broker await app.serve() # run rocketry @app.task("every 1 second", execution="async") async def publish(br: NatsBroker = Arg("broker")): await br.publish("Hi, Rocketry!", "test") if __name__ == "__main__": asyncio.run(start_app())