def get_app_schema(app: Union[FastStream, "StreamRouter[Any]"]) -> Schema:
"""Get the application schema.
Args:
app: An instance of FastStream or StreamRouter.
Returns:
The schema object.
"""
servers = get_app_broker_server(app)
channels = get_app_broker_channels(app)
messages: Dict[str, Message] = {}
payloads: Dict[str, Dict[str, Any]] = {}
for channel_name, ch in channels.items():
ch.servers = list(servers.keys())
if ch.subscribe is not None:
m = ch.subscribe.message
if isinstance(m, Message): # pragma: no branch
ch.subscribe.message = _resolve_msg_payloads(
m,
channel_name,
payloads,
messages,
)
if ch.publish is not None:
m = ch.publish.message
if isinstance(m, Message): # pragma: no branch
ch.publish.message = _resolve_msg_payloads(
m,
channel_name,
payloads,
messages,
)
broker = app.broker
if broker is None: # pragma: no cover
raise RuntimeError()
schema = Schema(
info=Info(
title=app.title,
version=app.version,
description=app.description,
termsOfService=app.terms_of_service,
contact=app.contact,
license=app.license,
),
defaultContentType=ContentTypes.json.value,
id=app.identifier,
tags=list(app.asyncapi_tags) if app.asyncapi_tags else None,
externalDocs=app.external_docs,
servers=servers,
channels=channels,
components=Components(
messages=messages,
schemas=payloads,
securitySchemes=None
if broker.security is None
else broker.security.get_schema(),
),
)
return schema