Get the broker server for an application.
Source code in faststream/asyncapi/generate.py
| def get_broker_server(
broker: "BrokerUsecase[MsgType, ConnectionType]",
) -> Dict[str, Server]:
"""Get the broker server for an application."""
servers = {}
broker_meta: Dict[str, Any] = {
"protocol": broker.protocol,
"protocolVersion": broker.protocol_version,
"description": broker.description,
"tags": broker.tags,
# TODO
# "variables": "",
# "bindings": "",
}
if broker.security is not None:
broker_meta["security"] = broker.security.get_requirement()
if isinstance(broker.url, str):
servers["development"] = Server(
url=broker.url,
**broker_meta,
)
elif len(broker.url) == 1:
servers["development"] = Server(
url=broker.url[0],
**broker_meta,
)
else:
for i, url in enumerate(broker.url, 1):
servers[f"Server{i}"] = Server(
url=url,
**broker_meta,
)
return servers
|