Skip to content

telemetry_attributes_provider_factory

faststream.nats.opentelemetry.provider.telemetry_attributes_provider_factory #

telemetry_attributes_provider_factory(msg: Optional[Msg]) -> NatsTelemetrySettingsProvider
telemetry_attributes_provider_factory(msg: Sequence[Msg]) -> NatsBatchTelemetrySettingsProvider
telemetry_attributes_provider_factory(msg: Union[Msg, Sequence[Msg], None]) -> Union[NatsTelemetrySettingsProvider, NatsBatchTelemetrySettingsProvider]
telemetry_attributes_provider_factory(msg)
Source code in faststream/nats/opentelemetry/provider.py
def telemetry_attributes_provider_factory(
    msg: Union["Msg", Sequence["Msg"], None],
) -> Union[
    NatsTelemetrySettingsProvider,
    NatsBatchTelemetrySettingsProvider,
    None,
]:
    if isinstance(msg, Sequence):
        return NatsBatchTelemetrySettingsProvider()
    elif isinstance(msg, Msg) or msg is None:
        return NatsTelemetrySettingsProvider()
    else:
        # KeyValue and Object Storage watch cases
        return None