Skip to content

RedisFastProducer

faststream.redis.producer.RedisFastProducer #

RedisFastProducer(
    connection: Redis[bytes],
    parser: Union[
        None,
        AsyncCustomParser[OneMessage, OneRedisMessage],
        AsyncCustomParser[BatchMessage, BatchRedisMessage],
    ],
    decoder: Union[
        None,
        AsyncCustomDecoder[OneRedisMessage],
        AsyncCustomDecoder[BatchRedisMessage],
    ],
)

A class to represent a Redis producer.

Initialize the Redis producer.

PARAMETER DESCRIPTION
connection

The Redis connection.

TYPE: Redis[bytes]

parser

The parser.

TYPE: Union[None, AsyncCustomParser[OneMessage, OneRedisMessage], AsyncCustomParser[BatchMessage, BatchRedisMessage]]

decoder

The decoder.

TYPE: Union[None, AsyncCustomDecoder[OneRedisMessage], AsyncCustomDecoder[BatchRedisMessage]]

Source code in faststream/redis/producer.py
def __init__(
    self,
    connection: "Redis[bytes]",
    parser: Union[
        None,
        AsyncCustomParser[OneMessage, OneRedisMessage],
        AsyncCustomParser[BatchMessage, BatchRedisMessage],
    ],
    decoder: Union[
        None,
        AsyncCustomDecoder[OneRedisMessage],
        AsyncCustomDecoder[BatchRedisMessage],
    ],
) -> None:
    """Initialize the Redis producer.

    Args:
        connection: The Redis connection.
        parser: The parser.
        decoder: The decoder.
    """
    self._connection = connection
    self._parser = resolve_custom_func(
        parser,  # type: ignore[arg-type,assignment]
        RedisParser.parse_message,
    )
    self._decoder = resolve_custom_func(decoder, RedisParser.decode_message)

publish async #

publish(
    message: SendableMessage,
    channel: Optional[str] = None,
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
    *,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False
) -> Optional[Any]
Source code in faststream/redis/producer.py
async def publish(
    self,
    message: SendableMessage,
    channel: Optional[str] = None,
    reply_to: str = "",
    headers: Optional[AnyDict] = None,
    correlation_id: Optional[str] = None,
    *,
    list: Optional[str] = None,
    stream: Optional[str] = None,
    rpc: bool = False,
    rpc_timeout: Optional[float] = 30.0,
    raise_timeout: bool = False,
) -> Optional[Any]:
    if not any((channel, list, stream)):
        raise ValueError(INCORRECT_SETUP_MSG)

    psub: Optional["PubSub"] = None
    if rpc is True:
        if reply_to:
            raise WRONG_PUBLISH_ARGS

        reply_to = str(uuid4())
        psub = self._connection.pubsub()
        await psub.subscribe(reply_to)

    msg = RawMessage.encode(
        message=message,
        reply_to=reply_to,
        headers=headers,
        correlation_id=correlation_id,
    )

    if channel is not None:
        await self._connection.publish(channel, msg)
    elif list is not None:
        await self._connection.rpush(list, msg)
    elif stream is not None:
        await self._connection.xadd(stream, {DATA_KEY: msg})
    else:
        raise AssertionError("unreachable")

    if psub is None:
        return None

    else:
        m = None
        with timeout_scope(rpc_timeout, raise_timeout):
            # skip subscribe message
            await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=rpc_timeout or 0.0,
            )

            # get real response
            m = await psub.get_message(
                ignore_subscribe_messages=True,
                timeout=rpc_timeout or 0.0,
            )

        await psub.unsubscribe()
        await psub.aclose()  # type: ignore[attr-defined]

        if m is None:
            if raise_timeout:
                raise TimeoutError()
            else:
                return None
        else:
            return await self._decoder(await self._parser(m))

publish_batch async #

publish_batch(*msgs: SendableMessage, list: str) -> None
Source code in faststream/redis/producer.py
async def publish_batch(
    self,
    *msgs: SendableMessage,
    list: str,
) -> None:
    batch = (encode_message(msg)[0] for msg in msgs)
    await self._connection.rpush(list, *batch)