Skip to content

publish

faststream.cli.main.publish #

publish(ctx: Context, app: str = typer.Argument(..., help='FastStream app instance, e.g., main:app'), message: str = typer.Argument(..., help='Message to be published'), rpc: bool = typer.Option(False, help='Enable RPC mode and system output')) -> None

Publish a message using the specified broker in a FastStream application.

This command publishes a message to a broker configured in a FastStream app instance. It supports various brokers and can handle extra arguments specific to each broker type.

PARAMETER DESCRIPTION
ctx

The Typer context for the command.

TYPE: Context

app

The FastStream application instance path, in the format 'module:instance'.

TYPE: str DEFAULT: Argument(..., help='FastStream app instance, e.g., main:app')

message

The message to be published.

TYPE: str DEFAULT: Argument(..., help='Message to be published')

rpc

If True, enables RPC mode and displays system output.

TYPE: bool DEFAULT: Option(False, help='Enable RPC mode and system output')

The command allows extra CLI arguments to be passed, which are broker-specific. These are parsed and passed to the broker's publish method.

Source code in faststream/cli/main.py
@cli.command(
    context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def publish(
    ctx: typer.Context,
    app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app"),
    message: str = typer.Argument(..., help="Message to be published"),
    rpc: bool = typer.Option(False, help="Enable RPC mode and system output"),
) -> None:
    """Publish a message using the specified broker in a FastStream application.

    This command publishes a message to a broker configured in a FastStream app instance.
    It supports various brokers and can handle extra arguments specific to each broker type.

    Args:
        ctx (typer.Context): The Typer context for the command.
        app (str): The FastStream application instance path, in the format 'module:instance'.
        message (str): The message to be published.
        rpc (bool): If True, enables RPC mode and displays system output.

    The command allows extra CLI arguments to be passed, which are broker-specific.
    These are parsed and passed to the broker's publish method.
    """
    app, extra = parse_cli_args(app, *ctx.args)
    extra["message"] = message
    extra["rpc"] = rpc

    try:
        if not app:
            raise ValueError("App parameter is required.")
        if not message:
            raise ValueError("Message parameter is required.")

        _, app_obj = import_from_string(app)
        if not app_obj.broker:
            raise ValueError("Broker instance not found in the app.")

        result = anyio.run(publish_message, app_obj, extra)

        if rpc:
            typer.echo(result)

    except Exception as e:
        typer.echo(f"Publish error: {e}")
        sys.exit(1)