Skip to content

QUICK START#

Install using pip:

pip install "faststream[kafka]"

Tip

To start a new project, we need a test broker container

docker run -d --rm -p 9092:9092 --name test-mq \
-e KAFKA_ENABLE_KRAFT=yes \
-e KAFKA_CFG_NODE_ID=1 \
-e KAFKA_CFG_PROCESS_ROLES=broker,controller \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
bitnami/kafka:3.5.0

pip install "faststream[confluent]"

Tip

To start a new project, we need a test broker container

docker run -d --rm -p 9092:9092 --name test-mq \
-e KAFKA_ENABLE_KRAFT=yes \
-e KAFKA_CFG_NODE_ID=1 \
-e KAFKA_CFG_PROCESS_ROLES=broker,controller \
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093 \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT \
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_BROKER_ID=1 \
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
bitnami/kafka:3.5.0

pip install "faststream[rabbit]"

Tip

To start a new project, we need a test broker container

docker run -d --rm -p 5672:5672 --name test-mq rabbitmq:alpine

pip install "faststream[nats]"

Tip

To start a new project, we need a test broker container

bash docker run -d --rm -p 4222:4222 --name test-mq nats -js

pip install "faststream[redis]"

Tip

To start a new project, we need a test broker container

bash docker run -d --rm -p 6379:6379 --name test-mq redis

Basic Usage#

To create a basic application, add the following code to a new file (e.g. serve.py):

serve.py
from faststream import FastStream
from faststream.kafka import KafkaBroker

broker = KafkaBroker("localhost:9092")

app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(body):
    print(body)
serve.py
from faststream import FastStream
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:9092")

app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(body):
    print(body)
serve.py
from faststream import FastStream
from faststream.rabbit import RabbitBroker

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(body):
    print(body)
serve.py
from faststream import FastStream
from faststream.nats import NatsBroker

broker = NatsBroker("nats://localhost:4222")

app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(body):
    print(body)
serve.py
from faststream import FastStream
from faststream.redis import RedisBroker

broker = RedisBroker("redis://localhost:6379")

app = FastStream(broker)


@broker.subscriber("test")
async def base_handler(body):
    print(body)

And just run this command:

faststream run serve:app

After running the command, you should see the following output:

INFO     - FastStream app starting...
INFO     - test |            - `BaseHandler` waiting for messages
INFO     - FastStream app started successfully! To exit, press CTRL+C

Enjoy your new development experience!

Manual run#

Also, you can run the FastStream application manually, as a regular async function:

import asyncio

async def main():
    app = FastStream(...)
    await app.run()  # blocking method

if __name__ == "__main__":
    asyncio.run(main())

Other tools integrations#

If you want to use FastStream as part of another framework service, you probably don't need to utilize the FastStream object at all, as it is primarily intended as a CLI tool target. Instead, you can start and stop your broker as part of another framework's lifespan. You can find such examples in the integrations section.

Don't forget to stop the test broker container
docker container stop test-mq