Skip to content

ConfluentFastConfig

faststream.confluent.config.ConfluentFastConfig #

ConfluentFastConfig(config)
Source code in faststream/confluent/config.py
def __init__(self, config: Optional[ConfluentConfig]) -> None:
    self.config = config

config instance-attribute #

config = config

as_config_dict #

as_config_dict()
Source code in faststream/confluent/config.py
def as_config_dict(self) -> "AnyDict":
    if not self.config:
        return {}

    data = dict(self.config)

    for key, enum in (
        ("compression.codec", CompressionCodec),
        ("compression.type", CompressionType),
        ("client.dns.lookup", ClientDNSLookup),
        ("offset.store.method", OffsetStoreMethod),
        ("isolation.level", IsolationLevel),
        ("sasl.oauthbearer.method", SASLOAUTHBearerMethod),
        ("security.protocol", SecurityProtocol),
        ("broker.address.family", BrokerAddressFamily),
        ("builtin.features", BuiltinFeatures),
        ("debug", Debug),
        ("group.protocol", GroupProtocol),
    ):
        if key in data:
            data[key] = enum(data[key]).value

    return data