Skip to content

Baggage

faststream.opentelemetry.Baggage #

Baggage(payload, batch_payload=None)
Source code in faststream/opentelemetry/baggage.py
def __init__(
    self, payload: "AnyDict", batch_payload: Optional[List["AnyDict"]] = None
) -> None:
    self._baggage = dict(payload)
    self._batch_baggage = [dict(b) for b in batch_payload] if batch_payload else []

get_all #

get_all()

Get a copy of the current baggage.

Source code in faststream/opentelemetry/baggage.py
def get_all(self) -> "AnyDict":
    """Get a copy of the current baggage."""
    return self._baggage.copy()

get_all_batch #

get_all_batch()

Get a copy of all batch baggage if exists.

Source code in faststream/opentelemetry/baggage.py
def get_all_batch(self) -> List["AnyDict"]:
    """Get a copy of all batch baggage if exists."""
    return self._batch_baggage.copy()

get #

get(key)

Get a value from the baggage by key.

Source code in faststream/opentelemetry/baggage.py
def get(self, key: str) -> Optional[Any]:
    """Get a value from the baggage by key."""
    return self._baggage.get(key)

remove #

remove(key)

Remove a baggage item by key.

Source code in faststream/opentelemetry/baggage.py
def remove(self, key: str) -> None:
    """Remove a baggage item by key."""
    self._baggage.pop(key, None)

set #

set(key, value)

Set a key-value pair in the baggage.

Source code in faststream/opentelemetry/baggage.py
def set(self, key: str, value: Any) -> None:
    """Set a key-value pair in the baggage."""
    self._baggage[key] = value

clear #

clear()

Clear the current baggage.

Source code in faststream/opentelemetry/baggage.py
def clear(self) -> None:
    """Clear the current baggage."""
    self._baggage.clear()

to_headers #

to_headers(headers=None)

Convert baggage items to headers format for propagation.

Source code in faststream/opentelemetry/baggage.py
def to_headers(self, headers: Optional["AnyDict"] = None) -> "AnyDict":
    """Convert baggage items to headers format for propagation."""
    current_context = context.get_current()
    if headers is None:
        headers = {}

    for k, v in self._baggage.items():
        current_context = baggage.set_baggage(k, v, context=current_context)

    _BAGGAGE_PROPAGATOR.inject(headers, current_context)
    return headers

from_msg classmethod #

from_msg(msg)

Create a Baggage instance from a StreamMessage.

Source code in faststream/opentelemetry/baggage.py
@classmethod
def from_msg(cls, msg: "StreamMessage[Any]") -> Self:
    """Create a Baggage instance from a StreamMessage."""
    if len(msg.batch_headers) <= 1:
        payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(msg.headers))
        return cls(cast("AnyDict", payload))

    cumulative_baggage: AnyDict = {}
    batch_baggage: List[AnyDict] = []

    for headers in msg.batch_headers:
        payload = baggage.get_all(_BAGGAGE_PROPAGATOR.extract(headers))
        cumulative_baggage.update(payload)
        batch_baggage.append(cast("AnyDict", payload))

    return cls(cumulative_baggage, batch_baggage)