FakeProducer
faststream.redis.testing.FakeProducer #
Bases: RedisFastProducer
Source code in faststream/redis/testing.py
publish async
#
publish(
message,
*,
channel=None,
list=None,
stream=None,
maxlen=None,
headers=None,
reply_to="",
correlation_id=None,
rpc=False,
rpc_timeout=30.0,
raise_timeout=False,
)
Source code in faststream/redis/testing.py
request async
#
request(
message,
*,
correlation_id,
channel=None,
list=None,
stream=None,
maxlen=None,
headers=None,
timeout=30.0,
)