publish(ctx, app=Argument(..., help='FastStream app instance, e.g., main:app.'), message=Argument(..., help='Message to be published.'), rpc=Option(False, help='Enable RPC mode and system output.'), is_factory=Option(False, '--factory', is_flag=True, help='Treat APP as an application factory.'))
Publish a message using the specified broker in a FastStream application.
This command publishes a message to a broker configured in a FastStream app instance. It supports various brokers and can handle extra arguments specific to each broker type. These are parsed and passed to the broker's publish method.
Source code in faststream/cli/main.py
| @cli.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def publish(
ctx: typer.Context,
app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app."),
message: str = typer.Argument(..., help="Message to be published."),
rpc: bool = typer.Option(False, help="Enable RPC mode and system output."),
is_factory: bool = typer.Option(
False,
"--factory",
is_flag=True,
help="Treat APP as an application factory.",
),
) -> None:
"""Publish a message using the specified broker in a FastStream application.
This command publishes a message to a broker configured in a FastStream app instance.
It supports various brokers and can handle extra arguments specific to each broker type.
These are parsed and passed to the broker's publish method.
"""
app, extra = parse_cli_args(app, *ctx.args)
extra["message"] = message
extra["rpc"] = rpc
try:
if not app:
raise ValueError("App parameter is required.")
if not message:
raise ValueError("Message parameter is required.")
_, app_obj = import_from_string(app, is_factory=is_factory)
if not app_obj.broker:
raise ValueError("Broker instance not found in the app.")
result = anyio.run(publish_message, app_obj.broker, extra)
if rpc:
typer.echo(result)
except Exception as e:
typer.echo(f"Publish error: {e}")
sys.exit(1)
|