Skip to content

BaseTelemetryMiddleware

faststream.opentelemetry.middleware.BaseTelemetryMiddleware #

BaseTelemetryMiddleware(*, tracer, settings_provider_factory, metrics_container, msg=None)

Bases: BaseMiddleware

Source code in faststream/opentelemetry/middleware.py
def __init__(
    self,
    *,
    tracer: "Tracer",
    settings_provider_factory: Callable[
        [Any], Optional[TelemetrySettingsProvider[Any]]
    ],
    metrics_container: _MetricsContainer,
    msg: Optional[Any] = None,
) -> None:
    self.msg = msg

    self._tracer = tracer
    self._metrics = metrics_container
    self._current_span: Optional[Span] = None
    self._origin_context: Optional[Context] = None
    self._scope_tokens: List[Tuple[str, Token[Any]]] = []
    self.__settings_provider = settings_provider_factory(msg)

msg instance-attribute #

msg = msg

on_receive async #

on_receive()

Hook to call on message receive.

Source code in faststream/broker/middlewares/base.py
async def on_receive(self) -> None:
    """Hook to call on message receive."""
    pass

on_consume async #

on_consume(msg)

Asynchronously consumes a message.

Source code in faststream/broker/middlewares/base.py
async def on_consume(
    self,
    msg: "StreamMessage[Any]",
) -> "StreamMessage[Any]":
    """Asynchronously consumes a message."""
    return msg

after_consume async #

after_consume(err)

A function to handle the result of consuming a resource asynchronously.

Source code in faststream/broker/middlewares/base.py
async def after_consume(self, err: Optional[Exception]) -> None:
    """A function to handle the result of consuming a resource asynchronously."""
    if err is not None:
        raise err

on_publish async #

on_publish(msg, *args, **kwargs)

Asynchronously handle a publish event.

Source code in faststream/broker/middlewares/base.py
async def on_publish(
    self,
    msg: Any,
    *args: Any,
    **kwargs: Any,
) -> Any:
    """Asynchronously handle a publish event."""
    return msg

after_publish async #

after_publish(err)

Asynchronous function to handle the after publish event.

Source code in faststream/broker/middlewares/base.py
async def after_publish(
    self,
    err: Optional[Exception],
) -> None:
    """Asynchronous function to handle the after publish event."""
    if err is not None:
        raise err

publish_scope async #

publish_scope(call_next, msg, *args, **kwargs)
Source code in faststream/opentelemetry/middleware.py
async def publish_scope(
    self,
    call_next: "AsyncFunc",
    msg: Any,
    *args: Any,
    **kwargs: Any,
) -> Any:
    if (provider := self.__settings_provider) is None:
        return await call_next(msg, *args, **kwargs)

    headers = kwargs.pop("headers", {}) or {}
    current_context = context.get_current()
    destination_name = provider.get_publish_destination_name(kwargs)

    current_baggage: Optional[Baggage] = fs_context.get_local("baggage")
    if current_baggage:
        headers.update(current_baggage.to_headers())

    trace_attributes = provider.get_publish_attrs_from_kwargs(kwargs)
    metrics_attributes = {
        SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
        SpanAttributes.MESSAGING_DESTINATION_NAME: destination_name,
    }

    # NOTE: if batch with single message?
    if (msg_count := len((msg, *args))) > 1:
        trace_attributes[SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT] = msg_count
        current_context = _BAGGAGE_PROPAGATOR.extract(headers, current_context)
        _BAGGAGE_PROPAGATOR.inject(
            headers, baggage.set_baggage(WITH_BATCH, True, context=current_context)
        )

    if self._current_span and self._current_span.is_recording():
        current_context = trace.set_span_in_context(
            self._current_span, current_context
        )
        _TRACE_PROPAGATOR.inject(headers, context=self._origin_context)

    else:
        create_span = self._tracer.start_span(
            name=_create_span_name(destination_name, MessageAction.CREATE),
            kind=trace.SpanKind.PRODUCER,
            attributes=trace_attributes,
        )
        current_context = trace.set_span_in_context(create_span)
        _TRACE_PROPAGATOR.inject(headers, context=current_context)
        create_span.end()

    start_time = time.perf_counter()

    try:
        with self._tracer.start_as_current_span(
            name=_create_span_name(destination_name, MessageAction.PUBLISH),
            kind=trace.SpanKind.PRODUCER,
            attributes=trace_attributes,
            context=current_context,
        ) as span:
            span.set_attribute(
                SpanAttributes.MESSAGING_OPERATION, MessageAction.PUBLISH
            )
            result = await call_next(msg, *args, headers=headers, **kwargs)

    except Exception as e:
        metrics_attributes[ERROR_TYPE] = type(e).__name__
        raise

    finally:
        duration = time.perf_counter() - start_time
        self._metrics.observe_publish(metrics_attributes, duration, msg_count)

    for key, token in self._scope_tokens:
        fs_context.reset_local(key, token)

    return result

consume_scope async #

consume_scope(call_next, msg)
Source code in faststream/opentelemetry/middleware.py
async def consume_scope(
    self,
    call_next: "AsyncFuncAny",
    msg: "StreamMessage[Any]",
) -> Any:
    if (provider := self.__settings_provider) is None:
        return await call_next(msg)

    if _is_batch_message(msg):
        links = _get_msg_links(msg)
        current_context = Context()
    else:
        links = None
        current_context = _TRACE_PROPAGATOR.extract(msg.headers)

    destination_name = provider.get_consume_destination_name(msg)
    trace_attributes = provider.get_consume_attrs_from_message(msg)
    metrics_attributes = {
        SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
        MESSAGING_DESTINATION_PUBLISH_NAME: destination_name,
    }

    if not len(current_context):
        create_span = self._tracer.start_span(
            name=_create_span_name(destination_name, MessageAction.CREATE),
            kind=trace.SpanKind.CONSUMER,
            attributes=trace_attributes,
            links=links,
        )
        current_context = trace.set_span_in_context(create_span)
        create_span.end()

    self._origin_context = current_context
    start_time = time.perf_counter()

    try:
        with self._tracer.start_as_current_span(
            name=_create_span_name(destination_name, MessageAction.PROCESS),
            kind=trace.SpanKind.CONSUMER,
            context=current_context,
            attributes=trace_attributes,
            end_on_exit=False,
        ) as span:
            span.set_attribute(
                SpanAttributes.MESSAGING_OPERATION, MessageAction.PROCESS
            )
            self._current_span = span

            self._scope_tokens.append(("span", fs_context.set_local("span", span)))
            self._scope_tokens.append(
                ("baggage", fs_context.set_local("baggage", Baggage.from_msg(msg)))
            )

            new_context = trace.set_span_in_context(span, current_context)
            token = context.attach(new_context)
            result = await call_next(msg)
            context.detach(token)

    except Exception as e:
        metrics_attributes[ERROR_TYPE] = type(e).__name__
        raise

    finally:
        duration = time.perf_counter() - start_time
        msg_count = trace_attributes.get(
            SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1
        )
        self._metrics.observe_consume(metrics_attributes, duration, msg_count)

    return result

after_processed async #

after_processed(exc_type=None, exc_val=None, exc_tb=None)
Source code in faststream/opentelemetry/middleware.py
async def after_processed(
    self,
    exc_type: Optional[Type[BaseException]] = None,
    exc_val: Optional[BaseException] = None,
    exc_tb: Optional["TracebackType"] = None,
) -> Optional[bool]:
    if self._current_span and self._current_span.is_recording():
        self._current_span.end()
    return False