async def publish_scope(
self,
call_next: "AsyncFunc",
msg: Any,
*args: Any,
**kwargs: Any,
) -> Any:
if (provider := self.__settings_provider) is None:
return await call_next(msg, *args, **kwargs)
headers = kwargs.pop("headers", {}) or {}
current_context = context.get_current()
destination_name = provider.get_publish_destination_name(kwargs)
current_baggage: Optional[Baggage] = fs_context.get_local("baggage")
if current_baggage:
headers.update(current_baggage.to_headers())
trace_attributes = provider.get_publish_attrs_from_kwargs(kwargs)
metrics_attributes = {
SpanAttributes.MESSAGING_SYSTEM: provider.messaging_system,
SpanAttributes.MESSAGING_DESTINATION_NAME: destination_name,
}
# NOTE: if batch with single message?
if (msg_count := len((msg, *args))) > 1:
trace_attributes[SpanAttributes.MESSAGING_BATCH_MESSAGE_COUNT] = msg_count
current_context = _BAGGAGE_PROPAGATOR.extract(headers, current_context)
_BAGGAGE_PROPAGATOR.inject(
headers, baggage.set_baggage(WITH_BATCH, True, context=current_context)
)
if self._current_span and self._current_span.is_recording():
current_context = trace.set_span_in_context(
self._current_span, current_context
)
_TRACE_PROPAGATOR.inject(headers, context=self._origin_context)
else:
create_span = self._tracer.start_span(
name=_create_span_name(destination_name, MessageAction.CREATE),
kind=trace.SpanKind.PRODUCER,
attributes=trace_attributes,
)
current_context = trace.set_span_in_context(create_span)
_TRACE_PROPAGATOR.inject(headers, context=current_context)
create_span.end()
start_time = time.perf_counter()
try:
with self._tracer.start_as_current_span(
name=_create_span_name(destination_name, MessageAction.PUBLISH),
kind=trace.SpanKind.PRODUCER,
attributes=trace_attributes,
context=current_context,
) as span:
span.set_attribute(
SpanAttributes.MESSAGING_OPERATION, MessageAction.PUBLISH
)
result = await call_next(msg, *args, headers=headers, **kwargs)
except Exception as e:
metrics_attributes[ERROR_TYPE] = type(e).__name__
raise
finally:
duration = time.perf_counter() - start_time
self._metrics.observe_publish(metrics_attributes, duration, msg_count)
for key, token in self._scope_tokens:
fs_context.reset_local(key, token)
return result