Skip to content

TestApp

faststream.broker.test.TestApp #

TestApp(app: FastStream, run_extra_options: Optional[Dict[str, SettingField]] = None)

A class to represent a test application.

METHOD DESCRIPTION
__init__

initializes the TestApp object

__aenter__

enters the asynchronous context and starts the FastStream application

__aexit__

exits the asynchronous context and stops the FastStream application

Initialize a class instance.

PARAMETER DESCRIPTION
app

An instance of the FastStream class.

TYPE: FastStream

run_extra_options

Optional dictionary of extra options for running the application.

TYPE: Optional[Dict[str, SettingField]] DEFAULT: None

RETURNS DESCRIPTION
None

None

Source code in faststream/broker/test.py
def __init__(
    self,
    app: FastStream,
    run_extra_options: Optional[Dict[str, SettingField]] = None,
) -> None:
    """Initialize a class instance.

    Args:
        app: An instance of the FastStream class.
        run_extra_options: Optional dictionary of extra options for running the application.

    Returns:
        None

    """
    self.app = app
    self._extra_options = run_extra_options or {}

app instance-attribute #

app: FastStream = app