Skip to content

Dependencies#

FastStream uses the secondary library FastDepends for dependency management. This dependency system is literally borrowed from FastAPI, so if you know how to work with that framework, you'll be comfortable with dependencies in FastStream.

You can visit the FastDepends documentation for more details, but the key points and additions are covered here.

Type Casting#

The key function in the dependency management and type conversion system in FastStream is the decorator @apply_types (also known as @inject in FastDepends).

By default, it applies to all event handlers, unless you disabled the same option when creating the broker.

from faststream.kafka import KafkaBroker
broker = KafkaBroker(..., apply_types=False)
from faststream.confluent import KafkaBroker
broker = KafkaBroker(..., apply_types=False)
from faststream.rabbit import RabbitBroker
broker = RabbitBroker(..., apply_types=False)
from faststream.nats import NatsBroker
broker = NatsBroker(..., apply_types=False)
from faststream.redis import RedisBroker
broker = RedisBroker(..., apply_types=False)

Warning

Setting the apply_types=False flag not only disables type casting but also Depends and Context. If you want to disable only type casting, use validate=False instead.

This flag can be useful if you are using FastStream within another framework and you need to use its native dependency system.

Dependency Injection#

To implement dependencies in FastStream, a special class called Depends is used

from faststream import FastStream, Depends
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

def simple_dependency():
    return 1

@broker.subscriber("test")
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
from faststream import FastStream, Depends
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

def simple_dependency():
    return 1

@broker.subscriber("test")
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
from faststream import FastStream, Depends
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

def simple_dependency():
    return 1

@broker.subscriber("test")
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
from faststream import FastStream, Depends
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

def simple_dependency():
    return 1

@broker.subscriber("test")
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
from faststream import FastStream, Depends
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

def simple_dependency():
    return 1

@broker.subscriber("test")
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1

The first step: You need to declare a dependency, which can be any Callable object.

Callable

A "Callable" is an object that can be "called". It can be a function, a class, or a class method.

In other words, if you can write code like my_object() - my_object is Callable

async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1

Second step: Declare which dependencies you need using Depends

async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1
async def handler(body: dict, d: int = Depends(simple_dependency)):
    assert d == 1

The last step: Just use the result of executing your dependency!

It's easy, isn't it?

Auto @apply_types

In the code above, we didn't use this decorator for our dependencies. However, it still applies to all functions used as dependencies. Please keep this in your mind.

Top-level Dependencies#

If you don't need a dependency result, you can use the following code:

@broker.subscriber("test")
def method(_ = Depends(...)): ...

But, using a special subscriber parameter is much more suitable:

@broker.subscriber("test", dependencies=[Depends(...)])
def method(): ...

You can also declare broker-level dependencies, which will be applied to all broker's handlers:

broker = RabbitBroker(dependencies=[Depends(...)])

Nested Dependencies#

Dependencies can also contain other dependencies. This works in a very predictable way: just declare Depends in the dependent function.

from faststream import FastStream, Depends
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

def another_dependency():
    return 1

def simple_dependency(b: int = Depends(another_dependency)): # (1)
    return b * 2

@broker.subscriber("test")
async def handler(
    body: dict,
    a: int = Depends(another_dependency),
    b: int = Depends(simple_dependency)):
    assert (a + b) == 3
  1. A nested dependency is called here
from faststream import FastStream, Depends
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")
app = FastStream(broker)

def another_dependency():
    return 1

def simple_dependency(b: int = Depends(another_dependency)): # (1)
    return b * 2

@broker.subscriber("test")
async def handler(
    body: dict,
    a: int = Depends(another_dependency),
    b: int = Depends(simple_dependency)):
    assert (a + b) == 3
  1. A nested dependency is called here
from faststream import FastStream, Depends
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastStream(broker)

def another_dependency():
    return 1

def simple_dependency(b: int = Depends(another_dependency)): # (1)
    return b * 2

@broker.subscriber("test")
async def handler(
    body: dict,
    a: int = Depends(another_dependency),
    b: int = Depends(simple_dependency)):
    assert (a + b) == 3
  1. A nested dependency is called here
from faststream import FastStream, Depends
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")
app = FastStream(broker)

def another_dependency():
    return 1

def simple_dependency(b: int = Depends(another_dependency)): # (1)
    return b * 2

@broker.subscriber("test")
async def handler(
    body: dict,
    a: int = Depends(another_dependency),
    b: int = Depends(simple_dependency)):
    assert (a + b) == 3
  1. A nested dependency is called here
from faststream import FastStream, Depends
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")
app = FastStream(broker)

def another_dependency():
    return 1

def simple_dependency(b: int = Depends(another_dependency)): # (1)
    return b * 2

@broker.subscriber("test")
async def handler(
    body: dict,
    a: int = Depends(another_dependency),
    b: int = Depends(simple_dependency)):
    assert (a + b) == 3
  1. A nested dependency is called here

Caching

In the example above, the another_dependency function will be called at ONCE! FastDepends caches all dependency execution results within ONE @apply_types call stack. This means that all nested dependencies will receive the cached result of dependency execution. But, between different calls of the main function, these results will be different.

To prevent this behavior, just use Depends(..., cache=False). In this case, the dependency will be used for each function in the call stack where it is used.

Use with Regular Functions#

You can use the decorator @apply_types not only with @broker.subscriber(...), but also with regular functions, both synchronous and asynchronous.

from faststream import Depends, apply_types

def simple_dependency(a: int, b: int = 3):
    return a + b

@apply_types
def method(a: int, d: int = Depends(simple_dependency)):
    return a + d

def test_sync_dependency():
    assert method("1") == 5
import asyncio
import pytest
from faststream import Depends, apply_types

async def simple_dependency(a: int, b: int = 3):
    return a + b

def another_dependency(a: int):
    return a

@apply_types
async def method(
    a: int,
    b: int = Depends(simple_dependency),
    c: int = Depends(another_dependency),
):
    return a + b + c

@pytest.mark.asyncio
async def test_async_dependency():
    assert 6 == await method("1")

Be careful

In asynchronous code, you can use both synchronous and asynchronous dependencies. But in synchronous code, only synchronous dependencies are available to you.

Casting Dependency Types#

FastDepends, used by FastStream, also gives the type return. This means that the value returned by the dependency will be be cast to the type twice: as return for dependencies and as the input argument of the main function. This does not incur additional costs if these types have the same annotation. Just keep it in mind. Or not... Anyway, I've warned you.

from faststream import Depends, apply_types

def simple_dependency(a: int, b: int = 3) -> str:
    return a + b  # 'return' is cast to `str` for the first time

@apply_types
def method(a: int, d: int = Depends(simple_dependency)):
    # 'd' is cast to `int` for the second time
    return a + d

assert method("1") == 5

Also, the result of executing the dependency is cached. If you use this dependency in N functions, this cached result will be converted to type N times (at the input to the function being used).

To avoid problems with this, use mypy or just be careful with the annotation of types in your project.