Skip to content

StreamRoute

faststream.broker.fastapi.StreamRoute #

StreamRoute(path: Union[NameRequired, str, None], *extra: Union[NameRequired, str], endpoint: Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]], broker: BrokerAsyncUsecase[MsgType, Any], dependencies: Sequence[Depends], provider_factory: Callable[[], Any], **handle_kwargs: Any)

Bases: BaseRoute, Generic[MsgType, P_HandlerParams, T_HandlerReturn]

A class representing a stream route.

Initialize a class instance.

PARAMETER DESCRIPTION
path

The path of the instance.

TYPE: Union[NameRequired, str, None]

*extra

Additional arguments.

TYPE: Union[NameRequired, str] DEFAULT: ()

endpoint

The endpoint of the instance.

TYPE: Union[Callable[P_HandlerParams, T_HandlerReturn], HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn]]

broker

The broker of the instance.

TYPE: BrokerAsyncUsecase[MsgType, Any]

dependencies

The dependencies of the instance.

TYPE: Sequence[Depends]

provider_factory

Provider factory for dependency overrides.

TYPE: Callable[[], Any]

**handle_kwargs

Additional keyword arguments.

TYPE: Any DEFAULT: {}

RETURNS DESCRIPTION
None

None.

Source code in faststream/broker/fastapi/route.py
def __init__(
    self,
    path: Union[NameRequired, str, None],
    *extra: Union[NameRequired, str],
    endpoint: Union[
        Callable[P_HandlerParams, T_HandlerReturn],
        HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
    ],
    broker: BrokerAsyncUsecase[MsgType, Any],
    dependencies: Sequence[params.Depends],
    provider_factory: Callable[[], Any],
    **handle_kwargs: Any,
) -> None:
    """Initialize a class instance.

    Args:
        path: The path of the instance.
        *extra: Additional arguments.
        endpoint: The endpoint of the instance.
        broker: The broker of the instance.
        dependencies: The dependencies of the instance.
        provider_factory: Provider factory for dependency overrides.
        **handle_kwargs: Additional keyword arguments.

    Returns:
        None.
    """
    self.path = path or ""
    self.broker = broker

    path_name = self.path if isinstance(self.path, str) else self.path.name

    if isinstance(endpoint, HandlerCallWrapper):
        orig_call = endpoint._original_call
    else:
        orig_call = endpoint

    dependant = get_dependant(
        path=path_name,
        call=orig_call,
    )
    for depends in dependencies[::-1]:
        dependant.dependencies.insert(
            0,
            get_parameterless_sub_dependant(depends=depends, path=path_name),
        )
    self.dependant = dependant

    call = wraps(orig_call)(
        StreamMessage.get_session(
            dependant,
            provider_factory,
        )
    )

    if isinstance(endpoint, HandlerCallWrapper):
        endpoint._original_call = call
        handler = endpoint

    else:
        handler = call

    self.handler = broker.subscriber(
        *extra,
        _raw=True,
        _get_dependant=lambda call: dependant,
        **handle_kwargs,
    )(
        handler  # type: ignore[arg-type]
    )

broker instance-attribute #

broker = broker

dependant instance-attribute #

dependant = dependant

handler instance-attribute #

handler: HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn] = subscriber(*extra, _raw=True, _get_dependant=lambda call: dependant, **handle_kwargs)(handler)

path instance-attribute #

path = path or ''

handle async #

handle(scope: Scope, receive: Receive, send: Send) -> None
Source code in starlette/routing.py
async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
    raise NotImplementedError()  # pragma: no cover

matches #

matches(scope: Scope) -> tuple[Match, Scope]
Source code in starlette/routing.py
def matches(self, scope: Scope) -> tuple[Match, Scope]:
    raise NotImplementedError()  # pragma: no cover

url_path_for #

url_path_for(name: str, /, **path_params: Any) -> URLPath
Source code in starlette/routing.py
def url_path_for(self, name: str, /, **path_params: typing.Any) -> URLPath:
    raise NotImplementedError()  # pragma: no cover