Tasks Scheduling#
FastStream is a framework for asynchronous service development. It allows you to build disturbed event-based systems in an easy way. Tasks scheduling is a pretty often usecase in such systems.
Unfortunatelly, this functional conflicts with the original FastStream ideology and can't be implemented as a part of the framework. But, you can integrate scheduling in your FastStream application by using some extra dependencies. And we have some reciepts how to make it.
Taskiq-FastStream#
Taskiq is an asynchronous distributed task queue for python. This project takes inspiration from big projects such as Celery and Dramatiq.
As a Celery replacement, Taskiq should support tasks sheduling and delayied publishing, of course. And it does!
By the way, you can easely integrate FastStream with the Taskiq. It allows you to create cron or delayied tasks to publish messages and trigger some functions this way.
We have a helpful project to provide you with this feature - Taskiq-FastStream.
It has two hepfull classes BrokerWrapper
and AppWrapper
to make your FastStream App and Broker objects taskiq-compatible.
Let's take a look at the code example.
At first, we should create a regular FastStream application.
Broker Wrapper#
Now, if you want to make it just working, we should wrap our Broker
to special BrokerWrapper
object:
It creates a taskiq-compatible object, that can be used as an object to create a regular taskiq scheduler.
broker.task(...)
has the same with the original broker.publish(...)
signature and allows you to plan your publishing tasks usign the great taskiq schedule
option (you can learn more about it here).
Finally, to run the scheduler, please use the taskiq CLI command:
Application Wrapper#
If you don't wont to lost application AsyncAPI schema or/and lifespans, you can wrap not the broker, but application by itself using AppWrapper
class.
It allows you to use taskiq_broker
the same way with the previous example, but saves all original FastStream features.
Tip
Creating a separated Scheduler service is a best way to make really disturbed and susteinable system. In this case, you can just create an empty FastStream broker and use Taskiq-FastStream integration to publish your messages (consuming by another services).
Generate message payload#
Also, you able to determine message payload right before sending and do not use the final one. To make it, just replace message
option from the final value to function (sync or async), that returns data to send:
async def collect_information_to_send():
return "Message to send"
taskiq_broker.task(
message=collect_information_to_send,
...
)
It allows you to collect some data from database, request an outer API, or use another ways to generate data to send right before sending.
More than, you can send not one, but multiple messages per one task using this feature. Just turn your message callback function to generator (sync or async) - and Taskiq-FastStream will iterates over your payload and publishes all of your messages!
async def collect_information_to_send():
"""Publish 10 messages per task call."""
for i in range(10):
yield i
taskiq_broker.task(
message=collect_information_to_send,
...
)
Rocketry#
Also, you can integrate your FastStream application with any other libraries provides you with a scheduling functional.
As an example, you can use Rocketry:
import asyncio
from rocketry import Rocketry
from rocketry.args import Arg
from faststream.nats import NatsBroker
app = Rocketry(execution="async")
broker = NatsBroker() # regular broker
app.params(broker=broker)
async def start_app():
async with broker: # connect broker
await app.serve() # run rocketry
@app.task("every 1 second", execution="async")
async def publish(br: NatsBroker = Arg("broker")):
await br.publish("Hi, Rocketry!", "test")
if __name__ == "__main__":
asyncio.run(start_app())