Tasks Scheduling#
FastStream is a framework for asynchronous service development. It allows you to build disturbed event-based systems in an easy way. Tasks scheduling is a pretty often use case in such systems.
Unfortunately, this functional conflicts with the original FastStream ideology and can't be implemented as a part of the framework. But, you can integrate scheduling in your FastStream application by using some extra dependencies. And we have some receipts on how to make it.
Taskiq-FastStream#
Taskiq is an asynchronous distributed task queue for python. This project takes inspiration from big projects such as Celery and Dramatiq.
As a Celery replacement, Taskiq should support tasks scheduling and delayed publishing, of course. And it does!
By the way, you can easily integrate FastStream with the Taskiq. It allows you to create cron or delayed tasks to publish messages and trigger some functions this way.
We have a helpful project to provide you with this feature - Taskiq-FastStream.
It has two helpful classes BrokerWrapper
and AppWrapper
to make your FastStream App and Broker objects taskiq-compatible.
Let's take a look at the code example.
At first, we should create a regular FastStream application.
Broker Wrapper#
Now, if you want to make it just working, we should wrap our Broker
to special BrokerWrapper
object:
It creates a taskiq-compatible object, that can be used as an object to create a regular taskiq scheduler.
We patched the original TaskiqScheduler
to support message generation callbacks, but its signature remains the same.
broker.task(...)
has the same with the original broker.publish(...)
signature and allows you to plan your publishing tasks using the great taskiq schedule
option (you can learn more about it here).
Finally, to run the scheduler, please use the taskiq CLI command:
Application Wrapper#
If you don't want to lose application AsyncAPI schema or/and lifespans, you can wrap not the broker, but the application itself using AppWrapper
class.
It allows you to use taskiq_broker
the same way with the previous example, but saves all original FastStream features.
Tip
Creating a separated Scheduler service is a best way to make really disturbed and sustainable system. In this case, you can just create an empty FastStream broker and use Taskiq-FastStream integration to publish your messages (consuming by another services).
Generate message payload#
Also, you are able to determine message payload right before sending and do not use the final one. To make it, just replace message
option from the final value to function (sync or async), that returns data to send:
async def collect_information_to_send():
return "Message to send"
taskiq_broker.task(
message=collect_information_to_send,
...
)
It allows you to collect some data from database, request an outer API, or use another ways to generate data to send right before sending.
More than, you can send not one, but multiple messages per one task using this feature. Just turn your message callback function to generator (sync or async) - and Taskiq-FastStream will iterate over your payload and publish all of your messages!
async def collect_information_to_send():
"""Publish 10 messages per task call."""
for i in range(10):
yield i
taskiq_broker.task(
message=collect_information_to_send,
...
)
Rocketry#
Also, you can integrate your FastStream application with any other libraries provides you with a scheduling functional.
As an example, you can use Rocketry: