Skip to content

Lifespan Context Manager#

Also, you can define startup and shutdown logic using the lifespan parameter of the FastStream app, and a "context manager" (I'll show you what that is in a second).

Let's start with an example from hooks page and refactor it using "context manager".

We create an async function lifespan() with yield like this:

from contextlib import asynccontextmanager

from faststream import Context, ContextRepo, FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")


def fake_ml_model_answer(x: float):
    return x * 42


@asynccontextmanager
async def lifespan(context: ContextRepo):
    # load fake ML model
    ml_models = { "answer_to_everything": fake_ml_model_answer }
    context.set_global("model", ml_models)

    yield

    # Clean up the ML models and release the resources
    ml_models.clear()


@broker.subscriber("test")
async def predict(x: float, model=Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}


app = FastStream(broker, lifespan=lifespan)
from contextlib import asynccontextmanager

from faststream import Context, ContextRepo, FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")


def fake_ml_model_answer(x: float):
    return x * 42


@asynccontextmanager
async def lifespan(context: ContextRepo):
    # load fake ML model
    ml_models = { "answer_to_everything": fake_ml_model_answer }
    context.set_global("model", ml_models)

    yield

    # Clean up the ML models and release the resources
    ml_models.clear()


@broker.subscriber("test")
async def predict(x: float, model=Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}


app = FastStream(broker, lifespan=lifespan)
from contextlib import asynccontextmanager

from faststream import Context, ContextRepo, FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")


def fake_ml_model_answer(x: float):
    return x * 42


@asynccontextmanager
async def lifespan(context: ContextRepo):
    # load fake ML model
    ml_models = { "answer_to_everything": fake_ml_model_answer }
    context.set_global("model", ml_models)

    yield

    # Clean up the ML models and release the resources
    ml_models.clear()


@broker.subscriber("test")
async def predict(x: float, model=Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}


app = FastStream(broker, lifespan=lifespan)
from contextlib import asynccontextmanager

from faststream import Context, ContextRepo, FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")


def fake_ml_model_answer(x: float):
    return x * 42


@asynccontextmanager
async def lifespan(context: ContextRepo):
    # load fake ML model
    ml_models = { "answer_to_everything": fake_ml_model_answer }
    context.set_global("model", ml_models)

    yield

    # Clean up the ML models and release the resources
    ml_models.clear()


@broker.subscriber("test")
async def predict(x: float, model=Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}


app = FastStream(broker, lifespan=lifespan)
from contextlib import asynccontextmanager

from faststream import Context, ContextRepo, FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")


def fake_ml_model_answer(x: float):
    return x * 42


@asynccontextmanager
async def lifespan(context: ContextRepo):
    # load fake ML model
    ml_models = { "answer_to_everything": fake_ml_model_answer }
    context.set_global("model", ml_models)

    yield

    # Clean up the ML models and release the resources
    ml_models.clear()


@broker.subscriber("test")
async def predict(x: float, model=Context()):
    result = model["answer_to_everything"](x)
    return {"result": result}


app = FastStream(broker, lifespan=lifespan)

As you can see, lifespan parameter is much suitable for case (than @app.on_startup and @app.after_shutdown separated calls) if you have object needs to process at application startup and shutdown both.

Tip

lifespan starts BEFORE your broker started (@app.on_startup hook) and AFTER broker was shutdown (@app.after_shutdown), so you can't publish any messages here.

If you want to make some actions will already/still running broker, please use @app.after_startup and @app.on_shutdown hooks.

Also, lifespan supports all FastStream hooks features:

  • Dependency Injection
  • extra CLI options passing