Skip to content

TestRedisBroker

faststream.redis.test.TestRedisBroker #

TestRedisBroker(
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
)

Bases: TestBroker[RedisBroker]

A class to test Redis brokers.

Initialize a class instance.

PARAMETER DESCRIPTION
broker

An instance of the Broker class.

TYPE: Broker

with_real

Whether to use a real broker.

TYPE: bool DEFAULT: False

connect_only

Whether to only connect to the broker.

TYPE: Optional[bool] DEFAULT: None

Source code in faststream/broker/test.py
def __init__(
    self,
    broker: Broker,
    with_real: bool = False,
    connect_only: Optional[bool] = None,
) -> None:
    """Initialize a class instance.

    Args:
        broker: An instance of the Broker class.
        with_real: Whether to use a real broker.
        connect_only: Whether to only connect to the broker.

    """
    self.with_real = with_real
    self.broker = broker

    if connect_only is None:
        try:
            connect_only = is_contains_context_name(
                self.__class__.__name__,
                TestApp.__name__,
            )

        except Exception as e:  # pragma: no cover
            # TODO: remove with 0.5.0
            warnings.warn(
                (
                    f"\nError `{e!r}` occurred at `{self.__class__.__name__}` AST parsing"
                    "\nPlease, report us by creating an Issue with your TestClient use case"
                    "\nhttps://github.com/airtai/faststream/issues/new?labels=bug&template=bug_report.md&title=Bug:%20TestClient%20AST%20parsing"
                ),
                category=RuntimeWarning,
                stacklevel=1,
            )

            connect_only = False

    self.connect_only = connect_only

broker instance-attribute #

broker = broker

connect_only instance-attribute #

connect_only = connect_only

with_real instance-attribute #

with_real = with_real

create_publisher_fake_subscriber staticmethod #

create_publisher_fake_subscriber(
    broker: RedisBroker, publisher: Publisher
) -> HandlerCallWrapper[Any, Any, Any]
Source code in faststream/redis/test.py
@staticmethod
def create_publisher_fake_subscriber(
    broker: RedisBroker,
    publisher: Publisher,
) -> HandlerCallWrapper[Any, Any, Any]:
    @broker.subscriber(
        channel=publisher.channel,
        list=publisher.list,
        stream=publisher.stream,
        _raw=True,
    )
    def f(msg: Any) -> None:
        pass

    return f

patch_publisher staticmethod #

patch_publisher(
    broker: RedisBroker, publisher: Any
) -> None
Source code in faststream/redis/test.py
@staticmethod
def patch_publisher(
    broker: RedisBroker,
    publisher: Any,
) -> None:
    publisher._producer = broker._producer

remove_publisher_fake_subscriber staticmethod #

remove_publisher_fake_subscriber(
    broker: RedisBroker, publisher: Publisher
) -> None
Source code in faststream/redis/test.py
@staticmethod
def remove_publisher_fake_subscriber(
    broker: RedisBroker,
    publisher: Publisher,
) -> None:
    any_of = publisher.channel or publisher.list or publisher.stream
    assert any_of  # nosec B101
    broker.handlers.pop(Handler.get_routing_hash(any_of), None)