Skip to content

parse_security

faststream.confluent.security.parse_security #

parse_security(security)
Source code in faststream/confluent/security.py
def parse_security(security: Optional[BaseSecurity]) -> "AnyDict":
    if security and isinstance(security.ssl_context, ssl.SSLContext):
        raise SetupError(
            "ssl_context in not supported by confluent-kafka-python, please use config instead."
        )

    if security is None:
        return {}
    elif isinstance(security, SASLPlaintext):
        return _parse_sasl_plaintext(security)
    elif isinstance(security, SASLScram256):
        return _parse_sasl_scram256(security)
    elif isinstance(security, SASLScram512):
        return _parse_sasl_scram512(security)
    elif isinstance(security, SASLOAuthBearer):
        return _parse_sasl_oauthbearer(security)
    elif isinstance(security, SASLGSSAPI):
        return _parse_sasl_gssapi(security)
    elif isinstance(security, BaseSecurity):
        return _parse_base_security(security)
    else:
        raise NotImplementedError(f"KafkaBroker does not support `{type(security)}`.")