Skip to content

Tasks Scheduling#

FastStream is a framework for asynchronous service development. It allows you to build disturbed event-based systems in an easy way. Tasks scheduling is a pretty often usecase in such systems.

Unfortunatelly, this functional conflicts with the original FastStream ideology and can't be implemented as a part of the framework. But, you can integrate scheduling in your FastStream application by using some extra dependencies. And we have some reciepts how to make it.

Taskiq-FastStream#

Taskiq is an asynchronous distributed task queue for python. This project takes inspiration from big projects such as Celery and Dramatiq.

As a Celery replacement, Taskiq should support tasks sheduling and delayied publishing, of course. And it does!

By the way, you can easely integrate FastStream with the Taskiq. It allows you to create cron or delayied tasks to publish messages and trigger some functions this way.

We have a helpful project to provide you with this feature - Taskiq-FastStream.

You can install it by the following command

pip install taskiq-faststream

It has two hepfull classes BrokerWrapper and AppWrapper to make your FastStream App and Broker objects taskiq-compatible.

Let's take a look at the code example.

At first, we should create a regular FastStream application.

from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

@broker.subscriber("in-topic")
@broker.publisher("out-topic")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

@broker.subscriber("in-queue")
@broker.publisher("out-queue")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

@broker.subscriber("in-subject")
@broker.publisher("out-subject")
async def handle_msg(user: str, user_id: int) -> str:
    return f"User: {user_id} - {user} registered"

Broker Wrapper#

Now, if you want to make it just working, we should wrap our Broker to special BrokerWrapper object:

from taskiq_faststream import BrokerWrapper

taskiq_broker = BrokerWrapper(broker)

It creates a taskiq-compatible object, that can be used as an object to create a regular taskiq scheduler.

from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource

taskiq_broker.task(
    message={"user": "John", "user_id": 1},
    topic="test-subject",
    schedule=[{
        "cron": "* * * * *",
    }],
)

scheduler = TaskiqScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource

taskiq_broker.task(
    message={"user": "John", "user_id": 1},
    queue="test-queue",
    schedule=[{
        "cron": "* * * * *",
    }],
)

scheduler = TaskiqScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)
from taskiq import TaskiqScheduler
from taskiq.schedule_sources import LabelScheduleSource

taskiq_broker.task(
    message={"user": "John", "user_id": 1},
    subject="test-subject",
    schedule=[{
        "cron": "* * * * *",
    }],
)

scheduler = TaskiqScheduler(
    broker=taskiq_broker,
    sources=[LabelScheduleSource(taskiq_broker)],
)

broker.task(...) has the same with the original broker.publish(...) signature and allows you to plan your publishing tasks usign the great taskiq schedule option (you can learn more about it here).

Finally, to run the scheduler, please use the taskiq CLI command:

taskiq scheduler module:scheduler

Application Wrapper#

If you don't wont to lost application AsyncAPI schema or/and lifespans, you can wrap not the broker, but application by itself using AppWrapper class.

from taskiq_faststream import AppWrapper

taskiq_broker = AppWrapper(app)

It allows you to use taskiq_broker the same way with the previous example, but saves all original FastStream features.

Tip

Creating a separated Scheduler service is a best way to make really disturbed and susteinable system. In this case, you can just create an empty FastStream broker and use Taskiq-FastStream integration to publish your messages (consuming by another services).

Generate message payload#

Also, you able to determine message payload right before sending and do not use the final one. To make it, just replace message option from the final value to function (sync or async), that returns data to send:

async def collect_information_to_send():
    return "Message to send"

taskiq_broker.task(
    message=collect_information_to_send,
    ...
)

It allows you to collect some data from database, request an outer API, or use another ways to generate data to send right before sending.

More than, you can send not one, but multiple messages per one task using this feature. Just turn your message callback function to generator (sync or async) - and Taskiq-FastStream will iterates over your payload and publishes all of your messages!

async def collect_information_to_send():
    """Publish 10 messages per task call."""
    for i in range(10):
        yield i

taskiq_broker.task(
    message=collect_information_to_send,
    ...
)

Rocketry#

Also, you can integrate your FastStream application with any other libraries provides you with a scheduling functional.

As an example, you can use Rocketry:

import asyncio

from rocketry import Rocketry
from rocketry.args import Arg

from faststream.nats import NatsBroker

app = Rocketry(execution="async")

broker = NatsBroker()      # regular broker
app.params(broker=broker)

async def start_app():
    async with broker:     # connect broker
        await app.serve()  # run rocketry

@app.task("every 1 second", execution="async")
async def publish(br: NatsBroker = Arg("broker")):
    await br.publish("Hi, Rocketry!", "test")

if __name__ == "__main__":
    asyncio.run(start_app())

Last update: 2023-11-13