LogicRedisHandler(
*,
log_context_builder: Callable[
[StreamMessage[Any]], Dict[str, str]
],
graceful_timeout: Optional[float] = None,
channel: Optional[PubSub] = None,
list: Optional[ListSub] = None,
stream: Optional[StreamSub] = None,
description: Optional[str] = None,
title: Optional[str] = None,
include_in_schema: bool = True
)
Bases: AsyncHandler[AnyRedisDict]
A class to represent a Redis handler.
Initialize the Redis handler.
Source code in faststream/redis/handler.py
| def __init__(
self,
*,
log_context_builder: Callable[[StreamMessage[Any]], Dict[str, str]],
graceful_timeout: Optional[float] = None,
# Redis info
channel: Optional[PubSub] = None,
list: Optional[ListSub] = None,
stream: Optional[StreamSub] = None,
# AsyncAPI information
description: Optional[str] = None,
title: Optional[str] = None,
include_in_schema: bool = True,
) -> None:
"""Initialize the Redis handler.
Args:
log_context_builder: The log context builder.
graceful_timeout: The graceful timeout.
channel: The channel.
list: The list.
stream: The stream.
description: The description.
title: The title.
include_in_schema: Whether to include in schema.
"""
self.channel = channel
self.list_sub = list
self.stream_sub = stream
self.subscription = None
self.task = None
self.last_id = stream.last_id if stream else "$"
super().__init__(
log_context_builder=log_context_builder,
description=description,
title=title,
include_in_schema=include_in_schema,
graceful_timeout=graceful_timeout,
)
|
call_name property
Returns the name of the handler call.
calls instance-attribute
calls: List[
Tuple[
HandlerCallWrapper[MsgType, Any, SendableMessage],
Callable[[StreamMessage[MsgType]], Awaitable[bool]],
AsyncParser[MsgType, Any],
AsyncDecoder[StreamMessage[MsgType]],
Sequence[Callable[[Any], BaseMiddleware]],
CallModel[Any, SendableMessage],
]
]
channel instance-attribute
description property
Returns the description of the handler.
global_middlewares instance-attribute
graceful_timeout instance-attribute
include_in_schema instance-attribute
last_id instance-attribute
last_id = last_id if stream else '$'
list_sub instance-attribute
log_context_builder instance-attribute
running instance-attribute
stream_sub instance-attribute
subscription instance-attribute
add_call
add_call(
*,
handler: HandlerCallWrapper[
AnyRedisDict, P_HandlerParams, T_HandlerReturn
],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
parser: Optional[
CustomParser[AnyRedisDict, RedisMessage]
],
decoder: Optional[CustomDecoder[RedisMessage]],
filter: Filter[RedisMessage],
middlewares: Optional[
Sequence[Callable[[AnyRedisDict], BaseMiddleware]]
]
) -> None
Source code in faststream/redis/handler.py
| def add_call(
self,
*,
handler: HandlerCallWrapper[AnyRedisDict, P_HandlerParams, T_HandlerReturn],
dependant: CallModel[P_HandlerParams, T_HandlerReturn],
parser: Optional[CustomParser[AnyRedisDict, RedisMessage]],
decoder: Optional[CustomDecoder[RedisMessage]],
filter: Filter[RedisMessage],
middlewares: Optional[Sequence[Callable[[AnyRedisDict], BaseMiddleware]]],
) -> None:
super().add_call(
handler=handler,
parser=resolve_custom_func(parser, RedisParser.parse_message), # type: ignore[arg-type]
decoder=resolve_custom_func(decoder, RedisParser.decode_message),
filter=filter, # type: ignore[arg-type]
dependant=dependant,
middlewares=middlewares,
)
|
close async
Source code in faststream/redis/handler.py
| async def close(self) -> None:
await super().close()
if self.task is not None:
if not self.task.done():
self.task.cancel()
self.task = None
if self.subscription is not None:
await self.subscription.unsubscribe()
await self.subscription.aclose() # type: ignore[attr-defined]
self.subscription = None
|
consume async
consume(msg: MsgType) -> SendableMessage
Consume a message asynchronously.
PARAMETER | DESCRIPTION |
msg | The message to be consumed. TYPE: MsgType |
RETURNS | DESCRIPTION |
SendableMessage | |
RAISES | DESCRIPTION |
StopConsume | If the consumption needs to be stopped. |
RAISES | DESCRIPTION |
Exception | If an error occurs during consumption. |
Source code in faststream/broker/handler.py
| @override
async def consume(self, msg: MsgType) -> SendableMessage: # type: ignore[override]
"""Consume a message asynchronously.
Args:
msg: The message to be consumed.
Returns:
The sendable message.
Raises:
StopConsume: If the consumption needs to be stopped.
Raises:
Exception: If an error occurs during consumption.
"""
result: Optional[WrappedReturn[SendableMessage]] = None
result_msg: SendableMessage = None
if not self.running:
return result_msg
log_context_tag: Optional["Token[Any]"] = None
async with AsyncExitStack() as stack:
stack.enter_context(self.lock)
stack.enter_context(context.scope("handler_", self))
gl_middlewares: List[BaseMiddleware] = [
await stack.enter_async_context(m(msg)) for m in self.global_middlewares
]
logged = False
processed = False
for handler, filter_, parser, decoder, middlewares, _ in self.calls:
local_middlewares: List[BaseMiddleware] = [
await stack.enter_async_context(m(msg)) for m in middlewares
]
all_middlewares = gl_middlewares + local_middlewares
# TODO: add parser & decoder caches
message = await parser(msg)
if not logged: # pragma: no branch
log_context_tag = context.set_local(
"log_context",
self.log_context_builder(message),
)
message.decoded_body = await decoder(message)
message.processed = processed
if await filter_(message):
assert ( # nosec B101
not processed
), "You can't process a message with multiple consumers"
try:
async with AsyncExitStack() as consume_stack:
for m_consume in all_middlewares:
message.decoded_body = (
await consume_stack.enter_async_context(
m_consume.consume_scope(message.decoded_body)
)
)
result = await cast(
Awaitable[Optional[WrappedReturn[SendableMessage]]],
handler.call_wrapped(message),
)
if result is not None:
result_msg, pub_response = result
# TODO: suppress all publishing errors and raise them after all publishers will be tried
for publisher in (pub_response, *handler._publishers):
if publisher is not None:
async with AsyncExitStack() as pub_stack:
result_to_send = result_msg
for m_pub in all_middlewares:
result_to_send = (
await pub_stack.enter_async_context(
m_pub.publish_scope(result_to_send)
)
)
await publisher.publish(
message=result_to_send,
correlation_id=message.correlation_id,
)
except StopConsume:
await self.close()
handler.trigger()
except HandlerException as e: # pragma: no cover
handler.trigger()
raise e
except Exception as e:
handler.trigger(error=e)
raise e
else:
handler.trigger(result=result[0] if result else None)
message.processed = processed = True
if IS_OPTIMIZED: # pragma: no cover
break
assert not self.running or processed, "You have to consume message" # nosec B101
if log_context_tag is not None:
context.reset_local("log_context", log_context_tag)
return result_msg
|
get_payloads
Get the payloads of the handler.
Source code in faststream/broker/handler.py
| def get_payloads(self) -> List[Tuple[AnyDict, str]]:
"""Get the payloads of the handler."""
payloads: List[Tuple[AnyDict, str]] = []
for h, _, _, _, _, dep in self.calls:
body = parse_handler_params(
dep, prefix=f"{self._title or self.call_name}:Message"
)
payloads.append((body, to_camelcase(unwrap(h._original_call).__name__)))
return payloads
|
get_routing_hash staticmethod
Source code in faststream/redis/handler.py
| @staticmethod
def get_routing_hash(channel: Hashable) -> int:
return hash(channel)
|
name
Returns the name of the API operation.
Source code in faststream/asyncapi/base.py
| @abstractproperty
def name(self) -> str:
"""Returns the name of the API operation."""
raise NotImplementedError()
|
schema
Returns the schema of the API operation as a dictionary of channel names and channel objects.
Source code in faststream/asyncapi/base.py
| def schema(self) -> Dict[str, Channel]: # pragma: no cover
"""Returns the schema of the API operation as a dictionary of channel names and channel objects."""
return {}
|
start async
start(client: Redis[bytes]) -> None
Source code in faststream/redis/handler.py
| @override
async def start(self, client: "Redis[bytes]") -> None: # type: ignore[override]
self.started = anyio.Event()
consume: Union[
Callable[[], Awaitable[Optional[AnyRedisDict]]],
Callable[[], Awaitable[Optional[Sequence[AnyRedisDict]]]],
]
sleep: float
if (list_sub := self.list_sub) is not None:
sleep = list_sub.polling_interval
consume = partial(
self._consume_list_msg,
client=client,
)
self.started.set()
elif (channel := self.channel) is not None:
self.subscription = psub = client.pubsub()
if channel.pattern:
await psub.psubscribe(channel.name)
else:
await psub.subscribe(channel.name)
consume = partial(
psub.get_message,
ignore_subscribe_messages=True,
timeout=channel.polling_interval,
)
sleep = 0.01
self.started.set()
elif self.stream_sub is not None:
consume = partial( # type: ignore[assignment]
self._consume_stream_msg,
client=client,
)
sleep = 0.01
else:
raise AssertionError("unreachable")
await super().start()
self.task = asyncio.create_task(self._consume(consume, sleep))
# wait until Stream starts to consume
await anyio.sleep(0.01)
await self.started.wait()
|