def __init__(
self,
call: Annotated[
Union[
Callable[..., "SendableMessage"],
Callable[..., Awaitable["SendableMessage"]],
],
Doc(
"Message handler function "
"to wrap the same with `@broker.subscriber(...)` way."
),
],
channel: Annotated[
Union["PubSub", str, None],
Doc("Redis PubSub object name to send message."),
] = None,
*,
publishers: Annotated[
Iterable["RedisPublisher"],
Doc("Redis publishers to broadcast the handler result."),
] = (),
list: Annotated[
Union["ListSub", str, None],
Doc("Redis List object name to send message."),
] = None,
stream: Annotated[
Union["StreamSub", str, None],
Doc("Redis Stream object name to send message."),
] = None,
# broker arguments
dependencies: Annotated[
Iterable["Depends"],
Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
] = (),
parser: Annotated[
Optional["CustomCallable"],
Doc(
"Parser to map original **aio_pika.IncomingMessage** Msg to FastStream one."
),
] = None,
decoder: Annotated[
Optional["CustomCallable"],
Doc("Function to decode FastStream msg bytes body to python objects."),
] = None,
middlewares: Annotated[
Sequence["SubscriberMiddleware[UnifyRedisMessage]"],
Doc("Subscriber middlewares to wrap incoming message processing."),
] = (),
filter: Annotated[
"Filter[UnifyRedisMessage]",
Doc(
"Overload subscriber to consume various messages from the same source."
),
deprecated(
"Deprecated in **FastStream 0.5.0**. "
"Please, create `subscriber` object and use it explicitly instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = default_filter,
retry: Annotated[
bool,
Doc("Whether to `nack` message at processing exception."),
] = False,
no_ack: Annotated[
bool,
Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
] = False,
no_reply: Annotated[
bool,
Doc(
"Whether to disable **FastStream** RPC and Reply To auto responses or not."
),
] = False,
# AsyncAPI information
title: Annotated[
Optional[str],
Doc("AsyncAPI subscriber object title."),
] = None,
description: Annotated[
Optional[str],
Doc(
"AsyncAPI subscriber object description. "
"Uses decorated docstring as default."
),
] = None,
include_in_schema: Annotated[
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
) -> None:
super().__init__(
call,
channel=channel,
publishers=publishers,
list=list,
stream=stream,
dependencies=dependencies,
parser=parser,
decoder=decoder,
middlewares=middlewares,
filter=filter,
retry=retry,
no_ack=no_ack,
no_reply=no_reply,
title=title,
description=description,
include_in_schema=include_in_schema,
)