FakeProducer
faststream.redis.test.FakeProducer #
FakeProducer(broker: RedisBroker)
Bases: RedisFastProducer
Source code in faststream/redis/test.py
publish async
#
publish(
message: SendableMessage,
channel: Optional[str] = None,
reply_to: str = "",
headers: Optional[AnyDict] = None,
correlation_id: Optional[str] = None,
*,
list: Optional[str] = None,
stream: Optional[str] = None,
rpc: bool = False,
rpc_timeout: Optional[float] = 30.0,
raise_timeout: bool = False
) -> Optional[Any]
Source code in faststream/redis/test.py
publish_batch async
#
publish_batch(*msgs: SendableMessage, list: str) -> None