Bases: TestBroker[RedisBroker]
A class to test Redis brokers.
Initialize a class instance.
PARAMETER | DESCRIPTION |
broker | An instance of the Broker class. TYPE: Broker |
with_real | Whether to use a real broker. TYPE: bool DEFAULT: False |
connect_only | Whether to only connect to the broker. TYPE: Optional[bool] DEFAULT: None |
Source code in faststream/broker/test.py
| def __init__(
self,
broker: Broker,
with_real: bool = False,
connect_only: Optional[bool] = None,
) -> None:
"""Initialize a class instance.
Args:
broker: An instance of the Broker class.
with_real: Whether to use a real broker.
connect_only: Whether to only connect to the broker.
"""
self.with_real = with_real
self.broker = broker
if connect_only is None:
try:
connect_only = is_contains_context_name(
self.__class__.__name__,
TestApp.__name__,
)
except Exception as e: # pragma: no cover
# TODO: remove with 0.5.0
warnings.warn(
(
f"\nError `{e!r}` occurred at `{self.__class__.__name__}` AST parsing"
"\nPlease, report us by creating an Issue with your TestClient use case"
"\nhttps://github.com/airtai/faststream/issues/new?labels=bug&template=bug_report.md&title=Bug:%20TestClient%20AST%20parsing"
),
category=RuntimeWarning,
stacklevel=1,
)
connect_only = False
self.connect_only = connect_only
|
broker instance-attribute
connect_only instance-attribute
with_real instance-attribute
create_publisher_fake_subscriber staticmethod
Source code in faststream/redis/test.py
| @staticmethod
def create_publisher_fake_subscriber(
broker: RedisBroker,
publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
@broker.subscriber(
channel=publisher.channel,
list=publisher.list,
stream=publisher.stream,
_raw=True,
)
def f(msg: Any) -> None:
pass
return f
|
patch_publisher staticmethod
Source code in faststream/redis/test.py
| @staticmethod
def patch_publisher(
broker: RedisBroker,
publisher: Any,
) -> None:
publisher._producer = broker._producer
|
remove_publisher_fake_subscriber staticmethod
Source code in faststream/redis/test.py
| @staticmethod
def remove_publisher_fake_subscriber(
broker: RedisBroker,
publisher: Publisher,
) -> None:
any_of = publisher.channel or publisher.list or publisher.stream
assert any_of # nosec B101
broker.handlers.pop(Handler.get_routing_hash(any_of), None)
|