Also, you can define startup and shutdown logic using the lifespan parameter of the FastStream app, and a "context manager" (I'll show you what that is in a second).
Let's start with an example from hooks page and refactor it using "context manager".
We create an async function lifespan() with yield like this:
fromcontextlibimportasynccontextmanagerfromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.kafkaimportKafkaBrokerbroker=KafkaBroker("localhost:9092")deffake_ml_model_answer(x:float):returnx*42@asynccontextmanagerasyncdeflifespan(context:ContextRepo):# load fake ML modelml_models={"answer_to_everything":fake_ml_model_answer}context.set_global("model",ml_models)yield# Clean up the ML models and release the resourcesml_models.clear()@broker.subscriber("test")asyncdefpredict(x:float,model=Context()):result=model["answer_to_everything"](x)return{"result":result}app=FastStream(broker,lifespan=lifespan)
fromcontextlibimportasynccontextmanagerfromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.confluentimportKafkaBrokerbroker=KafkaBroker("localhost:9092")deffake_ml_model_answer(x:float):returnx*42@asynccontextmanagerasyncdeflifespan(context:ContextRepo):# load fake ML modelml_models={"answer_to_everything":fake_ml_model_answer}context.set_global("model",ml_models)yield# Clean up the ML models and release the resourcesml_models.clear()@broker.subscriber("test")asyncdefpredict(x:float,model=Context()):result=model["answer_to_everything"](x)return{"result":result}app=FastStream(broker,lifespan=lifespan)
fromcontextlibimportasynccontextmanagerfromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.rabbitimportRabbitBrokerbroker=RabbitBroker("amqp://guest:guest@localhost:5672/")deffake_ml_model_answer(x:float):returnx*42@asynccontextmanagerasyncdeflifespan(context:ContextRepo):# load fake ML modelml_models={"answer_to_everything":fake_ml_model_answer}context.set_global("model",ml_models)yield# Clean up the ML models and release the resourcesml_models.clear()@broker.subscriber("test")asyncdefpredict(x:float,model=Context()):result=model["answer_to_everything"](x)return{"result":result}app=FastStream(broker,lifespan=lifespan)
fromcontextlibimportasynccontextmanagerfromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.natsimportNatsBrokerbroker=NatsBroker("nats://localhost:4222")deffake_ml_model_answer(x:float):returnx*42@asynccontextmanagerasyncdeflifespan(context:ContextRepo):# load fake ML modelml_models={"answer_to_everything":fake_ml_model_answer}context.set_global("model",ml_models)yield# Clean up the ML models and release the resourcesml_models.clear()@broker.subscriber("test")asyncdefpredict(x:float,model=Context()):result=model["answer_to_everything"](x)return{"result":result}app=FastStream(broker,lifespan=lifespan)
fromcontextlibimportasynccontextmanagerfromfaststreamimportContext,ContextRepo,FastStreamfromfaststream.redisimportRedisBrokerbroker=RedisBroker("redis://localhost:6379")deffake_ml_model_answer(x:float):returnx*42@asynccontextmanagerasyncdeflifespan(context:ContextRepo):# load fake ML modelml_models={"answer_to_everything":fake_ml_model_answer}context.set_global("model",ml_models)yield# Clean up the ML models and release the resourcesml_models.clear()@broker.subscriber("test")asyncdefpredict(x:float,model=Context()):result=model["answer_to_everything"](x)return{"result":result}app=FastStream(broker,lifespan=lifespan)
As you can see, lifespan parameter is much suitable for case (than @app.on_startup and @app.after_shutdown separated calls) if you have object needs to process at application startup and shutdown both.
Tip
lifespan starts BEFORE your broker started (@app.on_startup hook) and AFTER broker was shutdown (@app.after_shutdown), so you can't publish any messages here.
If you want to make some actions will already/still running broker, please use @app.after_startup and @app.on_shutdown hooks.
Also, lifespan supports all FastStream hooks features: