Skip to content

ant_ai.observer.obs

obs module-attribute

The global observability singleton.

Import and use this directly to emit events, record exceptions, and open spans.

ObservabilitySingleton

Global observability singleton.

Thread-safe and async-safe: contextvar state is task-local and restored on exit from bind(), so concurrent invocations never bleed into each other.

Source code in src/ant_ai/observer/obs.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
class ObservabilitySingleton:
    """Global observability singleton.

    Thread-safe and async-safe: contextvar state is task-local and restored
    on exit from `bind()`, so concurrent invocations never bleed into each other.
    """

    _sink: Any = None
    _ctx: ContextVar[dict[str, Any]] = ContextVar("obs_ctx", default={})  # noqa: B039

    def configure(self, sink: Any) -> None:
        """Set the active sink. Pass None to disable observability.

        Args:
            sink: An `ObservabilitySink` implementation, or None to disable.
        """
        self._sink = sink

    @contextmanager
    def bind(self, **fields: Any):
        """Merge fields into the current task's observability context.

        The context is restored on exit, so calls nest safely and concurrent
        invocations stay independent. All `event` and `span` calls made inside
        the block automatically include these fields.

        Args:
            **fields: Key-value pairs to add to the current context.
        """
        token = self._ctx.set({**self._ctx.get(), **fields})
        try:
            yield
        finally:
            self._ctx.reset(token)

    async def event(self, name: str, **fields: Any) -> None:
        """Emit a named lifecycle event with structured metadata.

        Context fields bound via `bind()` are merged in automatically.
        Never raises — sink errors are swallowed to protect the runtime.

        Args:
            name: Dot-namespaced event name (e.g. `step.start`).
            **fields: Additional metadata to attach to the event.
        """
        if self._sink is None:
            return
        with suppress(Exception):
            await self._sink.event(name, **{**self._ctx.get(), **fields})

    async def exception(self, name: str, error: Exception, **fields: Any) -> None:
        """Emit a named error event carrying the exception instance.

        Context fields bound via `bind()` are merged in automatically.
        Never raises — sink errors are swallowed to protect the runtime.

        Args:
            name: Dot-namespaced event name (e.g. `step.error`).
            error: The exception to record.
            **fields: Additional metadata to attach to the event.
        """
        if self._sink is None:
            return
        with suppress(Exception):
            await self._sink.exception(name, error, **{**self._ctx.get(), **fields})

    def span(self, name: str, **attrs: Any) -> AbstractAsyncContextManager[Any]:
        """Return an async context manager representing a unit of work.

        Returns a no-op context manager when no sink is configured.
        Context fields bound via `bind()` are merged into the span attributes.

        Args:
            name: Operation name for the span (e.g. `llm`, `tool`).
            **attrs: Additional attributes to attach to the span.

        Returns:
            An async context manager that opens and closes the span.
        """
        if self._sink is None:
            return _noop_span()
        return self._sink.span(name, **{**self._ctx.get(), **attrs})

    def propagation_headers(self) -> dict[str, str]:
        """Return headers encoding the current trace context for outbound requests."""
        if self._sink is None:
            return {}
        with suppress(Exception):
            return self._sink.propagation_headers()
        return {}

    @contextmanager
    def attach_propagation_context(self, headers: dict[str, str]) -> Iterator[None]:
        """Context manager that restores a remote trace context for the duration of the block."""
        if self._sink is None:
            yield
            return
        try:
            cm = self._sink.attach_propagation_context(headers)
        except Exception:
            yield
            return
        with cm:
            yield

configure

configure(sink: Any) -> None

Set the active sink. Pass None to disable observability.

Parameters:

Name Type Description Default
sink Any

An ObservabilitySink implementation, or None to disable.

required
Source code in src/ant_ai/observer/obs.py
24
25
26
27
28
29
30
def configure(self, sink: Any) -> None:
    """Set the active sink. Pass None to disable observability.

    Args:
        sink: An `ObservabilitySink` implementation, or None to disable.
    """
    self._sink = sink

bind

bind(**fields: Any)

Merge fields into the current task's observability context.

The context is restored on exit, so calls nest safely and concurrent invocations stay independent. All event and span calls made inside the block automatically include these fields.

Parameters:

Name Type Description Default
**fields Any

Key-value pairs to add to the current context.

{}
Source code in src/ant_ai/observer/obs.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
@contextmanager
def bind(self, **fields: Any):
    """Merge fields into the current task's observability context.

    The context is restored on exit, so calls nest safely and concurrent
    invocations stay independent. All `event` and `span` calls made inside
    the block automatically include these fields.

    Args:
        **fields: Key-value pairs to add to the current context.
    """
    token = self._ctx.set({**self._ctx.get(), **fields})
    try:
        yield
    finally:
        self._ctx.reset(token)

event async

event(name: str, **fields: Any) -> None

Emit a named lifecycle event with structured metadata.

Context fields bound via bind() are merged in automatically. Never raises — sink errors are swallowed to protect the runtime.

Parameters:

Name Type Description Default
name str

Dot-namespaced event name (e.g. step.start).

required
**fields Any

Additional metadata to attach to the event.

{}
Source code in src/ant_ai/observer/obs.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def event(self, name: str, **fields: Any) -> None:
    """Emit a named lifecycle event with structured metadata.

    Context fields bound via `bind()` are merged in automatically.
    Never raises — sink errors are swallowed to protect the runtime.

    Args:
        name: Dot-namespaced event name (e.g. `step.start`).
        **fields: Additional metadata to attach to the event.
    """
    if self._sink is None:
        return
    with suppress(Exception):
        await self._sink.event(name, **{**self._ctx.get(), **fields})

exception async

exception(
    name: str, error: Exception, **fields: Any
) -> None

Emit a named error event carrying the exception instance.

Context fields bound via bind() are merged in automatically. Never raises — sink errors are swallowed to protect the runtime.

Parameters:

Name Type Description Default
name str

Dot-namespaced event name (e.g. step.error).

required
error Exception

The exception to record.

required
**fields Any

Additional metadata to attach to the event.

{}
Source code in src/ant_ai/observer/obs.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def exception(self, name: str, error: Exception, **fields: Any) -> None:
    """Emit a named error event carrying the exception instance.

    Context fields bound via `bind()` are merged in automatically.
    Never raises — sink errors are swallowed to protect the runtime.

    Args:
        name: Dot-namespaced event name (e.g. `step.error`).
        error: The exception to record.
        **fields: Additional metadata to attach to the event.
    """
    if self._sink is None:
        return
    with suppress(Exception):
        await self._sink.exception(name, error, **{**self._ctx.get(), **fields})

span

span(
    name: str, **attrs: Any
) -> AbstractAsyncContextManager[Any]

Return an async context manager representing a unit of work.

Returns a no-op context manager when no sink is configured. Context fields bound via bind() are merged into the span attributes.

Parameters:

Name Type Description Default
name str

Operation name for the span (e.g. llm, tool).

required
**attrs Any

Additional attributes to attach to the span.

{}

Returns:

Type Description
AbstractAsyncContextManager[Any]

An async context manager that opens and closes the span.

Source code in src/ant_ai/observer/obs.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def span(self, name: str, **attrs: Any) -> AbstractAsyncContextManager[Any]:
    """Return an async context manager representing a unit of work.

    Returns a no-op context manager when no sink is configured.
    Context fields bound via `bind()` are merged into the span attributes.

    Args:
        name: Operation name for the span (e.g. `llm`, `tool`).
        **attrs: Additional attributes to attach to the span.

    Returns:
        An async context manager that opens and closes the span.
    """
    if self._sink is None:
        return _noop_span()
    return self._sink.span(name, **{**self._ctx.get(), **attrs})

propagation_headers

propagation_headers() -> dict[str, str]

Return headers encoding the current trace context for outbound requests.

Source code in src/ant_ai/observer/obs.py
 97
 98
 99
100
101
102
103
def propagation_headers(self) -> dict[str, str]:
    """Return headers encoding the current trace context for outbound requests."""
    if self._sink is None:
        return {}
    with suppress(Exception):
        return self._sink.propagation_headers()
    return {}

attach_propagation_context

attach_propagation_context(
    headers: dict[str, str],
) -> Iterator[None]

Context manager that restores a remote trace context for the duration of the block.

Source code in src/ant_ai/observer/obs.py
105
106
107
108
109
110
111
112
113
114
115
116
117
@contextmanager
def attach_propagation_context(self, headers: dict[str, str]) -> Iterator[None]:
    """Context manager that restores a remote trace context for the duration of the block."""
    if self._sink is None:
        yield
        return
    try:
        cm = self._sink.attach_propagation_context(headers)
    except Exception:
        yield
        return
    with cm:
        yield