def __init__(
self,
call: Annotated[
Union[
Callable[..., "AioPikaSendableMessage"],
Callable[..., Awaitable["AioPikaSendableMessage"]],
],
Doc(
"Message handler function "
"to wrap the same with `@broker.subscriber(...)` way."
),
],
queue: Annotated[
Union[str, "RabbitQueue"],
Doc(
"RabbitMQ queue to listen. "
"**FastStream** declares and binds queue object to `exchange` automatically if it is not passive (by default)."
),
],
exchange: Annotated[
Union[str, "RabbitExchange", None],
Doc(
"RabbitMQ exchange to bind queue to. "
"Uses default exchange if not presented. "
"**FastStream** declares exchange object automatically if it is not passive (by default)."
),
] = None,
*,
publishers: Annotated[
Iterable[RabbitPublisher],
Doc("RabbitMQ publishers to broadcast the handler result."),
] = (),
consume_args: Annotated[
Optional["AnyDict"],
Doc("Extra consumer arguments to use in `queue.consume(...)` method."),
] = None,
reply_config: Annotated[
Optional["ReplyConfig"],
Doc("Extra options to use at replies publishing."),
deprecated(
"Deprecated in **FastStream 0.5.16**. "
"Please, use `RabbitResponse` object as a handler return instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = None,
# broker arguments
dependencies: Annotated[
Iterable["Depends"],
Doc("Dependencies list (`[Depends(),]`) to apply to the subscriber."),
] = (),
parser: Annotated[
Optional["CustomCallable"],
Doc("Parser to map original **IncomingMessage** Msg to FastStream one."),
] = None,
decoder: Annotated[
Optional["CustomCallable"],
Doc("Function to decode FastStream msg bytes body to python objects."),
] = None,
middlewares: Annotated[
Iterable["SubscriberMiddleware[RabbitMessage]"],
Doc("Subscriber middlewares to wrap incoming message processing."),
] = (),
filter: Annotated[
"Filter[RabbitMessage]",
Doc(
"Overload subscriber to consume various messages from the same source."
),
deprecated(
"Deprecated in **FastStream 0.5.0**. "
"Please, create `subscriber` object and use it explicitly instead. "
"Argument will be removed in **FastStream 0.6.0**."
),
] = default_filter,
retry: Annotated[
Union[bool, int],
Doc("Whether to `nack` message at processing exception."),
] = False,
no_ack: Annotated[
bool,
Doc("Whether to disable **FastStream** autoacknowledgement logic or not."),
] = False,
no_reply: Annotated[
bool,
Doc(
"Whether to disable **FastStream** RPC and Reply To auto responses or not."
),
] = False,
# AsyncAPI information
title: Annotated[
Optional[str],
Doc("AsyncAPI subscriber object title."),
] = None,
description: Annotated[
Optional[str],
Doc(
"AsyncAPI subscriber object description. "
"Uses decorated docstring as default."
),
] = None,
include_in_schema: Annotated[
bool,
Doc("Whetever to include operation in AsyncAPI schema or not."),
] = True,
) -> None:
super().__init__(
call,
publishers=publishers,
queue=queue,
exchange=exchange,
consume_args=consume_args,
reply_config=reply_config,
dependencies=dependencies,
parser=parser,
decoder=decoder,
middlewares=middlewares,
filter=filter,
retry=retry,
no_ack=no_ack,
no_reply=no_reply,
title=title,
description=description,
include_in_schema=include_in_schema,
)