Skip to content

StreamRoute

faststream.broker.fastapi.StreamRoute #

StreamRoute(
    path: Union[NameRequired, str, None],
    *extra: Union[NameRequired, str],
    endpoint: Union[
        Callable[P_HandlerParams, T_HandlerReturn],
        HandlerCallWrapper[
            MsgType, P_HandlerParams, T_HandlerReturn
        ],
    ],
    broker: BrokerAsyncUsecase[MsgType, Any],
    dependencies: Sequence[Depends] = (),
    dependency_overrides_provider: Optional[Any] = None,
    **handle_kwargs: Any
)

Bases: BaseRoute, Generic[MsgType, P_HandlerParams, T_HandlerReturn]

A class representing a stream route.

Initialize a class instance.

PARAMETER DESCRIPTION
path

The path of the instance.

TYPE: Union[NameRequired, str, None]

*extra

Additional arguments.

TYPE: Union[NameRequired, str] DEFAULT: ()

endpoint

The endpoint of the instance.

TYPE: Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]

broker

The broker of the instance.

TYPE: BrokerAsyncUsecase[MsgType, Any]

dependencies

The dependencies of the instance.

TYPE: Sequence[Depends] DEFAULT: ()

dependency_overrides_provider

The provider for dependency overrides.

TYPE: Optional[Any] DEFAULT: None

**handle_kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

None.

Source code in faststream/broker/fastapi/route.py
def __init__(
    self,
    path: Union[NameRequired, str, None],
    *extra: Union[NameRequired, str],
    endpoint: Union[
        Callable[P_HandlerParams, T_HandlerReturn],
        HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
    ],
    broker: BrokerAsyncUsecase[MsgType, Any],
    dependencies: Sequence[params.Depends] = (),
    dependency_overrides_provider: Optional[Any] = None,
    **handle_kwargs: Any,
) -> None:
    """Initialize a class instance.

    Args:
        path: The path of the instance.
        *extra: Additional arguments.
        endpoint: The endpoint of the instance.
        broker: The broker of the instance.
        dependencies: The dependencies of the instance.
        dependency_overrides_provider: The provider for dependency overrides.
        **handle_kwargs: Additional keyword arguments.

    Returns:
        None.
    """
    self.path = path or ""
    self.broker = broker

    path_name = self.path if isinstance(self.path, str) else self.path.name

    if isinstance(endpoint, HandlerCallWrapper):
        orig_call = endpoint._original_call
    else:
        orig_call = endpoint

    dependant = get_dependant(
        path=path_name,
        call=orig_call,
    )
    for depends in dependencies[::-1]:
        dependant.dependencies.insert(
            0,
            get_parameterless_sub_dependant(depends=depends, path=path_name),
        )
    self.dependant = dependant

    call = wraps(orig_call)(
        StreamMessage.get_session(
            dependant,
            dependency_overrides_provider,
        )
    )

    if isinstance(endpoint, HandlerCallWrapper):
        endpoint._original_call = call
        handler = endpoint

    else:
        handler = call

    self.handler = broker.subscriber(
        path,
        *extra,
        _raw=True,
        _get_dependant=lambda call: dependant,
        **handle_kwargs,
    )(
        handler  # type: ignore[arg-type]
    )

broker instance-attribute #

broker = broker

dependant instance-attribute #

dependant = dependant

handler instance-attribute #

handler: HandlerCallWrapper[
    MsgType, P_HandlerParams, T_HandlerReturn
] = subscriber(
    path,
    *extra,
    _raw=True,
    _get_dependant=lambda: dependant,
    **handle_kwargs
)(
    handler
)

path instance-attribute #

path = path or ''

handle async #

handle(scope: Scope, receive: Receive, send: Send) -> None
Source code in starlette/routing.py
async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
    raise NotImplementedError()  # pragma: no cover

matches #

matches(scope: Scope) -> Tuple[Match, Scope]
Source code in starlette/routing.py
def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
    raise NotImplementedError()  # pragma: no cover

url_path_for #

url_path_for(name: str, /, **path_params: Any) -> URLPath
Source code in starlette/routing.py
def url_path_for(self, name: str, /, **path_params: typing.Any) -> URLPath:
    raise NotImplementedError()  # pragma: no cover