Skip to content

publish

faststream.cli.main.publish #

publish(ctx, app=Argument(..., help='FastStream app instance, e.g., main:app.'), message=Argument(..., help='Message to be published.'), rpc=Option(False, help='Enable RPC mode and system output.'), is_factory=Option(False, '--factory', is_flag=True, help='Treat APP as an application factory.'))

Publish a message using the specified broker in a FastStream application.

This command publishes a message to a broker configured in a FastStream app instance. It supports various brokers and can handle extra arguments specific to each broker type. These are parsed and passed to the broker's publish method.

Source code in faststream/cli/main.py
@cli.command(
    context_settings={"allow_extra_args": True, "ignore_unknown_options": True}
)
def publish(
    ctx: typer.Context,
    app: str = typer.Argument(..., help="FastStream app instance, e.g., main:app."),
    message: str = typer.Argument(..., help="Message to be published."),
    rpc: bool = typer.Option(False, help="Enable RPC mode and system output."),
    is_factory: bool = typer.Option(
        False,
        "--factory",
        is_flag=True,
        help="Treat APP as an application factory.",
    ),
) -> None:
    """Publish a message using the specified broker in a FastStream application.

    This command publishes a message to a broker configured in a FastStream app instance.
    It supports various brokers and can handle extra arguments specific to each broker type.
    These are parsed and passed to the broker's publish method.
    """
    app, extra = parse_cli_args(app, *ctx.args)
    extra["message"] = message
    extra["rpc"] = rpc

    try:
        if not app:
            raise ValueError("App parameter is required.")
        if not message:
            raise ValueError("Message parameter is required.")

        _, app_obj = import_from_string(app, is_factory=is_factory)

        if not app_obj.broker:
            raise ValueError("Broker instance not found in the app.")

        result = anyio.run(publish_message, app_obj.broker, extra)

        if rpc:
            typer.echo(result)

    except Exception as e:
        typer.echo(f"Publish error: {e}")
        sys.exit(1)