Skip to content

FastAPI Plugin#

Handling messages#

FastStream can be used as a part of FastAPI.

Just import a StreamRouter you need and declare the message handler in the same way as with a regular FastStream application.

Tip

When used in this way, FastStream does not use its own dependency system but integrates into FastAPI. That is, you can use Depends, BackgroundTasks and other original FastAPI features as if it were a regular HTTP endpoint, but you can't use faststream.Context and faststream.Depends.

Note that the code below uses fastapi.Depends, not faststream.Depends.

Also, instead original faststream.Context you should use faststream.[broker].fastapi.Context (the same with already created annotations)

from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.kafka.fastapi import KafkaRouter, Logger

router = KafkaRouter("localhost:9092")

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return {"response": "Hello, Kafka!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.confluent.fastapi import KafkaRouter, Logger

router = KafkaRouter("localhost:9092")

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return {"response": "Hello, Kafka!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.rabbit.fastapi import RabbitRouter, Logger

router = RabbitRouter("amqp://guest:guest@localhost:5672/")

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return {"response": "Hello, Rabbit!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.nats.fastapi import NatsRouter, Logger

router = NatsRouter("nats://localhost:4222")

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return {"response": "Hello, NATS!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import Depends, FastAPI
from pydantic import BaseModel

from faststream.redis.fastapi import RedisRouter, Logger

router = RedisRouter("redis://localhost:6379")

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.subscriber("test")
@router.publisher("response")
async def hello(m: Incoming, logger: Logger, d=Depends(call)):
    logger.info(m)
    return {"response": "Hello, Redis!"}

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

When processing a message from a broker, the entire message body is placed simultaneously in both the body and path request parameters. You can access them in any way convenient for you. The message header is placed in headers.

Also, this router can be fully used as an HttpRouter (of which it is the inheritor). So, you can use it to declare any get, post, put and other HTTP methods. For example, this is done at line 20.

Warning

If your ASGI server does not support installing state inside lifespan, you can disable this behavior as follows:

router = StreamRouter(..., setup_state=False)

However, after that, you will not be able to access the broker from your application's state (but it is still available as the router.broker).

Accessing the Broker Object#

Inside each router, there is a broker. You can easily access it if you need to send a message to MQ:

from fastapi import FastAPI

from faststream.kafka.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)


@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import FastAPI

from faststream.confluent.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)


@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import FastAPI

from faststream.rabbit.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672/")

app = FastAPI(lifespan=router.lifespan_context)


@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Rabbit!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import FastAPI

from faststream.nats.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)


@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, NATS!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import FastAPI

from faststream.redis.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)


@router.get("/")
async def hello_http():
    await router.broker.publish("Hello, Redis!", "test")
    return "Hello, HTTP!"


app.include_router(router)

Also, you can use the following Depends to access the broker if you want to use it at different parts of your program:

from fastapi import Depends, FastAPI
from typing_extensions import Annotated

from faststream.kafka import KafkaBroker, fastapi

router = fastapi.KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)


def broker():
    return router.broker


@router.get("/")
async def hello_http(broker: Annotated[KafkaBroker, Depends(broker)]):
    await broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import Depends, FastAPI
from typing_extensions import Annotated

from faststream.confluent import KafkaBroker, fastapi

router = fastapi.KafkaRouter("localhost:9092")

app = FastAPI(lifespan=router.lifespan_context)


def broker():
    return router.broker


@router.get("/")
async def hello_http(broker: Annotated[KafkaBroker, Depends(broker)]):
    await broker.publish("Hello, Kafka!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import Depends, FastAPI
from typing_extensions import Annotated

from faststream.rabbit import RabbitBroker, fastapi

router = fastapi.RabbitRouter("amqp://guest:guest@localhost:5672/")

app = FastAPI(lifespan=router.lifespan_context)


def broker():
    return router.broker


@router.get("/")
async def hello_http(broker: Annotated[RabbitBroker, Depends(broker)]):
    await broker.publish("Hello, Rabbit!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import Depends, FastAPI
from typing_extensions import Annotated

from faststream.nats import NatsBroker, fastapi

router = fastapi.NatsRouter("nats://localhost:4222")

app = FastAPI(lifespan=router.lifespan_context)


def broker():
    return router.broker


@router.get("/")
async def hello_http(broker: Annotated[NatsBroker, Depends(broker)]):
    await broker.publish("Hello, NATS!", "test")
    return "Hello, HTTP!"


app.include_router(router)
from fastapi import Depends, FastAPI
from typing_extensions import Annotated

from faststream.redis import RedisBroker, fastapi

router = fastapi.RedisRouter("redis://localhost:6379")

app = FastAPI(lifespan=router.lifespan_context)


def broker():
    return router.broker


@router.get("/")
async def hello_http(broker: Annotated[RedisBroker, Depends(broker)]):
    await broker.publish("Hello, Redis!", "test")
    return "Hello, HTTP!"


app.include_router(router)

Or you can access the broker from a FastAPI application state (if you don't disable it with setup_state=False):

from fastapi import Request

@app.get("/")
def main(request: Request):
    broker = request.state.broker

@after_startup#

The FastStream application has the @after_startup hook, which allows you to perform operations with your message broker after the connection is established. This can be extremely convenient for managing your brokers' objects and/or sending messages. This hook is also available for your FastAPI StreamRouter

from fastapi import FastAPI

from faststream.kafka.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")


@router.subscriber("test")
async def hello(msg: str):
    return {"response": "Hello, Kafka!"}


@router.after_startup
async def test(app: FastAPI):
    await router.broker.publish("Hello!", "test")


app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import FastAPI

from faststream.confluent.fastapi import KafkaRouter

router = KafkaRouter("localhost:9092")


@router.subscriber("test")
async def hello(msg: str):
    return {"response": "Hello, Kafka!"}


@router.after_startup
async def test(app: FastAPI):
    await router.broker.publish("Hello!", "test")


app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import FastAPI

from faststream.rabbit.fastapi import RabbitRouter

router = RabbitRouter("amqp://guest:guest@localhost:5672/")


@router.subscriber("test")
async def hello(msg: str):
    return {"response": "Hello, Rabbit!"}


@router.after_startup
async def test(app: FastAPI):
    await router.broker.publish("Hello!", "test")


app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import FastAPI

from faststream.nats.fastapi import NatsRouter

router = NatsRouter("nats://localhost:4222")


@router.subscriber("test")
async def hello(msg: str):
    return {"response": "Hello, NATS!"}


@router.after_startup
async def test(app: FastAPI):
    await router.broker.publish("Hello!", "test")


app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)
from fastapi import FastAPI

from faststream.redis.fastapi import RedisRouter

router = RedisRouter("redis://localhost:6379")


@router.subscriber("test")
async def hello(msg: str):
    return {"response": "Hello, Redis!"}


@router.after_startup
async def test(app: FastAPI):
    await router.broker.publish("Hello!", "test")


app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

Documentation#

When using FastStream as a router for FastAPI, the framework automatically registers endpoints for hosting AsyncAPI documentation into your application with the following default values:

from faststream.kafka.fastapi import KafkaRouter

router = KafkaRouter(
    ...,
    schema_url="/asyncapi",
    include_in_schema=True,
)
from faststream.confluent.fastapi import KafkaRouter

router = KafkaRouter(
    ...,
    schema_url="/asyncapi",
    include_in_schema=True,
)
from faststream.rabbit.fastapi import RabbitRouter

router = RabbitRouter(
    ...,
    schema_url="/asyncapi",
    include_in_schema=True,
)
from faststream.nats.fastapi import NatsRouter

router = NatsRouter(
    ...,
    schema_url="/asyncapi",
    include_in_schema=True,
)
from faststream.redis.fastapi import RedisRouter

router = RedisRouter(
    ...,
    schema_url="/asyncapi",
    include_in_schema=True,
)

This way, you will have three routes to interact with your application's AsyncAPI schema:

  • /asyncapi - the same as the CLI created page
  • /asyncapi.json - download the JSON schema representation
  • /asyncapi.yaml - download the YAML schema representation

Testing#

To test your FastAPI StreamRouter, you can still use it with the TestClient:

import pytest

from faststream.kafka import TestKafkaBroker, fastapi

router = fastapi.KafkaRouter()


@router.subscriber("test")
async def handler(msg: str):
    ...


@pytest.mark.asyncio
async def test_router():
    async with TestKafkaBroker(router.broker) as br:
        await br.publish("Hi!", "test")

        handler.mock.assert_called_once_with("Hi!")
import pytest

from faststream.confluent import TestKafkaBroker, fastapi

router = fastapi.KafkaRouter()


@router.subscriber("test")
async def handler(msg: str):
    ...


@pytest.mark.asyncio
async def test_router():
    async with TestKafkaBroker(router.broker) as br:
        await br.publish("Hi!", "test")

        handler.mock.assert_called_once_with("Hi!")
import pytest

from faststream.rabbit import TestRabbitBroker, fastapi

router = fastapi.RabbitRouter()


@router.subscriber("test")
async def handler(msg: str):
    ...


@pytest.mark.asyncio
async def test_router():
    async with TestRabbitBroker(router.broker) as br:
        await br.publish("Hi!", "test")

        handler.mock.assert_called_once_with("Hi!")
import pytest

from faststream.nats import TestNatsBroker, fastapi

router = fastapi.NatsRouter()


@router.subscriber("test")
async def handler(msg: str):
    ...


@pytest.mark.asyncio
async def test_router():
    async with TestNatsBroker(router.broker) as br:
        await br.publish("Hi!", "test")

        handler.mock.assert_called_once_with("Hi!")
import pytest

from faststream.redis import TestRedisBroker, fastapi

router = fastapi.RedisRouter()


@router.subscriber("test")
async def handler(msg: str):
    ...


@pytest.mark.asyncio
async def test_router():
    async with TestRedisBroker(router.broker) as br:
        await br.publish("Hi!", "test")

        handler.mock.assert_called_once_with("Hi!")

Multiple Routers#

Using FastStream as a FastAPI plugin you are still able to separate messages processing logic between different routers (like with a regular HTTPRouter). But it can be confusing - how you should include multiple routers, if we have to setup router.lifespan_context as a FastAPI object lifespan.

You can make it in a two ways, depends on you reminds.

Routers nesting#

If you want to use the SAME CONNECTION for all of you routers you should nest them each other and finally use only the core router to include it into FastAPI object.

from fastapi import FastAPI
from faststream.kafka.fastapi import KafkaRouter

core_router = KafkaRouter()
nested_router = KafkaRouter()

@core_router.subscriber("core-topic")
async def handler():
    ...

@nested_router.subscriber("nested-topic")
async def nested_handler():
    ...

core_router.include_router(nested_router)

app = FastAPI(lifespan=core_router.lifespan_context)
app.include_router(core_router)
from fastapi import FastAPI
from faststream.confluent.fastapi import KafkaRouter

core_router = KafkaRouter()
nested_router = KafkaRouter()

@core_router.subscriber("core-topic")
async def handler():
    ...

@nested_router.subscriber("nested-topic")
async def nested_handler():
    ...

core_router.include_router(nested_router)

app = FastAPI(lifespan=core_router.lifespan_context)
app.include_router(core_router)
from fastapi import FastAPI
from faststream.rabbit.fastapi import RabbitRouter

core_router = RabbitRouter()
nested_router = RabbitRouter()

@core_router.subscriber("core-queue")
async def handler():
    ...

@nested_router.subscriber("nested-queue")
async def nested_handler():
    ...

core_router.include_router(nested_router)

app = FastAPI(lifespan=core_router.lifespan_context)
app.include_router(core_router)
from fastapi import FastAPI
from faststream.nats.fastapi import NatsRouter

core_router = NatsRouter()
nested_router = NatsRouter()

@core_router.subscriber("core-subject")
async def handler():
    ...

@nested_router.subscriber("nested-subject")
async def nested_handler():
    ...

core_router.include_router(nested_router)

app = FastAPI(lifespan=core_router.lifespan_context)
app.include_router(core_router)
from fastapi import FastAPI
from faststream.redis.fastapi import RedisRouter

core_router = RedisRouter()
nested_router = RedisRouter()

@core_router.subscriber("core-channel")
async def handler():
    ...

@nested_router.subscriber("nested-channel")
async def nested_handler():
    ...

core_router.include_router(nested_router)

app = FastAPI(lifespan=core_router.lifespan_context)
app.include_router(core_router)

This way the core router collects all nested routers publishers and subscribers and stores it like its own.

Custom lifespan#

Otherwise, if you want to has multiple connections to different broker instances, you should start routers independently in your custom lifespan

from contextlib import asynccontextmanager

from fastapi import FastAPI
from faststream.kafka.fastapi import KafkaRouter

core_router = KafkaRouter()
nested_router = KafkaRouter()

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with (
        core_router.lifespan_context(app),
        nested_router.lifespan_context(app),
    ):
        yield

app = FastAPI(lifespan=lifespan)
app.include_router(core_router)
app.include_router(nested_router)
from contextlib import asynccontextmanager

from fastapi import FastAPI
from faststream.confluent.fastapi import KafkaRouter

core_router = KafkaRouter()
nested_router = KafkaRouter()

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with (
        core_router.lifespan_context(app),
        nested_router.lifespan_context(app),
    ):
        yield

app = FastAPI(lifespan=lifespan)
app.include_router(core_router)
app.include_router(nested_router)
from contextlib import asynccontextmanager

from fastapi import FastAPI
from faststream.rabbit.fastapi import RabbitRouter

core_router = RabbitRouter()
nested_router = RabbitRouter()

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with (
        core_router.lifespan_context(app),
        nested_router.lifespan_context(app),
    ):
        yield

app = FastAPI(lifespan=lifespan)
app.include_router(core_router)
app.include_router(nested_router)
from contextlib import asynccontextmanager

from fastapi import FastAPI
from faststream.nats.fastapi import NatsRouter

core_router = NatsRouter()
nested_router = NatsRouter()

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with (
        core_router.lifespan_context(app),
        nested_router.lifespan_context(app),
    ):
        yield

app = FastAPI(lifespan=lifespan)
app.include_router(core_router)
app.include_router(nested_router)
from contextlib import asynccontextmanager

from fastapi import FastAPI
from faststream.redis.fastapi import RedisRouter

core_router = RedisRouter()
nested_router = RedisRouter()

@asynccontextmanager
async def lifespan(app: FastAPI):
    async with (
        core_router.lifespan_context(app),
        nested_router.lifespan_context(app),
    ):
        yield

app = FastAPI(lifespan=lifespan)
app.include_router(core_router)
app.include_router(nested_router)

Warning

This way you lose AsyncAPI schema, but we are working on it.