Middlewares#
Middlewares are a powerful mechanism that allows you to add additional logic to any stage of the message processing pipeline.
This way, you can greatly extend your FastStream application with features such as:
- Integration with any logging/metrics systems
- Application-level message serialization logic
- Rich publishing of messages with extra information
- And many other capabilities
Middlewares have several methods to override. You can implement some or all of them and use middlewares at the broker, router, or subscriber level. Thus, middlewares are the most flexible FastStream feature.
Message Processing Middleware#
Unfortunately, this powerful feature has a somewhat complex signature too.
Using middlewares, you can wrap the entire message processing pipeline. In this case, you need to specify on_receive
and after_processed
methods:
These methods should be overridden only in a broker-level middlewares.
Also, you can use BaseMiddleware
inheritors as router-level dependencies as well(they will be applied only to objects created by this router):
Tip
Please always call super()
methods at the end of your function; this is important for correct error processing.
Subscriber Middleware#
Subscriber middlewares will be called at your handler
function call. Using it, you can patch the incoming message body right before passing it to your consumer subscriber or catch any handler exception by returning a fallback value to publish (the middleware return value will be published then).
In this case, you need to implement the consume_scope
middleware method:
Note
The msg
option always has the already decoded body. To prevent the default json.loads(...)
call, you should use a custom decoder instead.
If you want to apply such middleware to a specific subscriber instead of the whole application, you can just create a function with the same signature and pass it right to your subscriber:
Publisher Middlewares#
Finally, using middlewares, you are able to patch outgoing messages too. For example, you can compress/encode outgoing messages at the application level or add custom types serialization logic.
Publisher middlewares can be applied at the broker, router or each publisher level. Broker publisher middlewares affect all the ways to publish something (including broker.publish
call).
In this case, you need to specify the publish_scope
method:
This method consumes the message body to send and any other options passing to the publish
call (destination. headers, etc).
Also, you can specify middleware for publisher object as well. In this case, you should create a function with the same publish_scope
signature and use it as a publisher middleware:
Note
If you are using publish_batch
somewhere in your app, your publisher middleware should consume *msgs
option additionally.