Skip to content

make_ping_asgi

faststream.asgi.factories.make_ping_asgi #

make_ping_asgi(broker, /, timeout=None)
Source code in faststream/asgi/factories.py
def make_ping_asgi(
    broker: "BrokerUsecase[Any, Any]",
    /,
    timeout: Optional[float] = None,
) -> "ASGIApp":
    healthy_response = AsgiResponse(b"", 204)
    unhealthy_response = AsgiResponse(b"", 500)

    @get
    async def ping(scope: "Scope") -> AsgiResponse:
        if await broker.ping(timeout):
            return healthy_response
        else:
            return unhealthy_response

    return ping