Skip to content

make_ping_asgi

faststream.asgi.factories.make_ping_asgi #

make_ping_asgi(
    broker, /, timeout=None, include_in_schema=True
)
Source code in faststream/asgi/factories.py
def make_ping_asgi(
    broker: "BrokerUsecase[Any, Any]",
    /,
    timeout: Optional[float] = None,
    include_in_schema: bool = True,
) -> "ASGIApp":
    healthy_response = AsgiResponse(b"", 204)
    unhealthy_response = AsgiResponse(b"", 500)

    @get(include_in_schema=include_in_schema)
    async def ping(scope: "Scope") -> AsgiResponse:
        if await broker.ping(timeout):
            return healthy_response
        return unhealthy_response

    return ping