Skip to content

ant_ai.observer.protocol

ObservabilitySink

Bases: Protocol

Protocol for observability backends.

Implementations must be safe to call concurrently and must never raise — wrap any internal errors in a try/except.

Source code in src/ant_ai/observer/protocol.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class ObservabilitySink(Protocol):
    """Protocol for observability backends.

    Implementations must be safe to call concurrently and must never raise —
    wrap any internal errors in a try/except.
    """

    async def event(self, name: str, **fields: object) -> None:
        """Emit a named lifecycle event with structured metadata.

        Event names follow dot-namespaced conventions, e.g.
        `workflow.start`, `node.error`, `step.end`.

        Args:
            name: Dot-namespaced event name.
            **fields: Arbitrary structured metadata to attach to the event.
        """
        ...

    async def exception(self, name: str, error: Exception, **fields: object) -> None:
        """Emit an error event carrying the exception instance.

        Tracing backends should call `span.record_exception`; log backends
        should log the exception alongside the structured fields.

        Args:
            name: Dot-namespaced event name.
            error: The exception to record.
            **fields: Arbitrary structured metadata to attach to the event.
        """
        ...

    def span(self, name: str, **attrs: object) -> AbstractAsyncContextManager:
        """Return an async context manager representing a unit of work.

        Span names use operation names, not lifecycle suffixes (e.g. `llm`,
        `tool`, `router`). Attributes carry per-operation metadata such as
        `model`, `tool_name`, `messages`, or `session_id`.

        Args:
            name: Operation name for the span.
            **attrs: Metadata attributes to attach to the span.

        Returns:
            An async context manager that opens and closes the span.
        """
        ...

    def propagation_headers(self) -> dict[str, str]:
        """Return headers encoding the current trace context for outbound requests.

        Used to propagate distributed trace context across process boundaries
        (e.g. into A2A HTTP calls). Returns an empty dict when tracing is
        inactive or not supported.
        """
        ...

    def attach_propagation_context(
        self, headers: dict[str, str]
    ) -> AbstractContextManager:
        """Return a context manager that restores a remote trace context.

        Extracts the trace context encoded in *headers* (e.g. from an inbound
        A2A request) and makes it the active context for the duration of the
        block, so child observations are nested under the remote parent span.

        Args:
            headers: Carrier dict previously produced by `propagation_headers`.
        """
        ...

event async

event(name: str, **fields: object) -> None

Emit a named lifecycle event with structured metadata.

Event names follow dot-namespaced conventions, e.g. workflow.start, node.error, step.end.

Parameters:

Name Type Description Default
name str

Dot-namespaced event name.

required
**fields object

Arbitrary structured metadata to attach to the event.

{}
Source code in src/ant_ai/observer/protocol.py
14
15
16
17
18
19
20
21
22
23
24
async def event(self, name: str, **fields: object) -> None:
    """Emit a named lifecycle event with structured metadata.

    Event names follow dot-namespaced conventions, e.g.
    `workflow.start`, `node.error`, `step.end`.

    Args:
        name: Dot-namespaced event name.
        **fields: Arbitrary structured metadata to attach to the event.
    """
    ...

exception async

exception(
    name: str, error: Exception, **fields: object
) -> None

Emit an error event carrying the exception instance.

Tracing backends should call span.record_exception; log backends should log the exception alongside the structured fields.

Parameters:

Name Type Description Default
name str

Dot-namespaced event name.

required
error Exception

The exception to record.

required
**fields object

Arbitrary structured metadata to attach to the event.

{}
Source code in src/ant_ai/observer/protocol.py
26
27
28
29
30
31
32
33
34
35
36
37
async def exception(self, name: str, error: Exception, **fields: object) -> None:
    """Emit an error event carrying the exception instance.

    Tracing backends should call `span.record_exception`; log backends
    should log the exception alongside the structured fields.

    Args:
        name: Dot-namespaced event name.
        error: The exception to record.
        **fields: Arbitrary structured metadata to attach to the event.
    """
    ...

span

span(
    name: str, **attrs: object
) -> AbstractAsyncContextManager

Return an async context manager representing a unit of work.

Span names use operation names, not lifecycle suffixes (e.g. llm, tool, router). Attributes carry per-operation metadata such as model, tool_name, messages, or session_id.

Parameters:

Name Type Description Default
name str

Operation name for the span.

required
**attrs object

Metadata attributes to attach to the span.

{}

Returns:

Type Description
AbstractAsyncContextManager

An async context manager that opens and closes the span.

Source code in src/ant_ai/observer/protocol.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
def span(self, name: str, **attrs: object) -> AbstractAsyncContextManager:
    """Return an async context manager representing a unit of work.

    Span names use operation names, not lifecycle suffixes (e.g. `llm`,
    `tool`, `router`). Attributes carry per-operation metadata such as
    `model`, `tool_name`, `messages`, or `session_id`.

    Args:
        name: Operation name for the span.
        **attrs: Metadata attributes to attach to the span.

    Returns:
        An async context manager that opens and closes the span.
    """
    ...

propagation_headers

propagation_headers() -> dict[str, str]

Return headers encoding the current trace context for outbound requests.

Used to propagate distributed trace context across process boundaries (e.g. into A2A HTTP calls). Returns an empty dict when tracing is inactive or not supported.

Source code in src/ant_ai/observer/protocol.py
55
56
57
58
59
60
61
62
def propagation_headers(self) -> dict[str, str]:
    """Return headers encoding the current trace context for outbound requests.

    Used to propagate distributed trace context across process boundaries
    (e.g. into A2A HTTP calls). Returns an empty dict when tracing is
    inactive or not supported.
    """
    ...

attach_propagation_context

attach_propagation_context(
    headers: dict[str, str],
) -> AbstractContextManager

Return a context manager that restores a remote trace context.

Extracts the trace context encoded in headers (e.g. from an inbound A2A request) and makes it the active context for the duration of the block, so child observations are nested under the remote parent span.

Parameters:

Name Type Description Default
headers dict[str, str]

Carrier dict previously produced by propagation_headers.

required
Source code in src/ant_ai/observer/protocol.py
64
65
66
67
68
69
70
71
72
73
74
75
76
def attach_propagation_context(
    self, headers: dict[str, str]
) -> AbstractContextManager:
    """Return a context manager that restores a remote trace context.

    Extracts the trace context encoded in *headers* (e.g. from an inbound
    A2A request) and makes it the active context for the duration of the
    block, so child observations are nested under the remote parent span.

    Args:
        headers: Carrier dict previously produced by `propagation_headers`.
    """
    ...