publish(ctx: Context, app: str = typer.Argument(..., help='FastStream app instance, e.g., main:app'), message: str = typer.Argument(..., help='Message to be published'), rpc: bool = typer.Option(False, help='Enable RPC mode and system output')) -> None
Publish a message using the specified broker in a FastStream application.
This command publishes a message to a broker configured in a FastStream app instance. It supports various brokers and can handle extra arguments specific to each broker type.
PARAMETER | DESCRIPTION |
ctx | The Typer context for the command. TYPE: Context |
app | The FastStream application instance path, in the format 'module:instance'. TYPE: str DEFAULT: Argument(..., help='FastStream app instance, e.g., main:app') |
message | The message to be published. TYPE: str DEFAULT: Argument(..., help='Message to be published') |
rpc | If True, enables RPC mode and displays system output. TYPE: bool DEFAULT: Option(False, help='Enable RPC mode and system output') |
The command allows extra CLI arguments to be passed, which are broker-specific. These are parsed and passed to the broker's publish method.
Source code in faststream/cli/main.py
| @cli.command(
context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def publish(
ctx: typer.Context,
app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"),
message: str = typer.Argument(..., help="Message to be published"),
rpc: bool = typer.Option(False, help="Enable RPC mode and system output"),
) -> None:
"""Publish a message using the specified broker in a FastStream application.
This command publishes a message to a broker configured in a FastStream app instance.
It supports various brokers and can handle extra arguments specific to each broker type.
Args:
ctx (typer.Context): The Typer context for the command.
app (str): The FastStream application instance path, in the format 'module:instance'.
message (str): The message to be published.
rpc (bool): If True, enables RPC mode and displays system output.
The command allows extra CLI arguments to be passed, which are broker-specific.
These are parsed and passed to the broker's publish method.
"""
app, extra = parse_cli_args(app, *ctx.args)
extra["message"] = message
extra["rpc"] = rpc
try:
if not app:
raise ValueError("App parameter is required.")
if not message:
raise ValueError("Message parameter is required.")
_, app_obj = import_from_string(app)
if not app_obj.broker:
raise ValueError("Broker instance not found in the app.")
result = anyio.run(publish_message, app_obj, extra)
if rpc:
typer.echo(result)
except Exception as e:
typer.echo(f"Publish error: {e}")
sys.exit(1)
|