Skip to content

ant_ai.observer.composite

CompositeSink

Fans out all observability calls to a list of sinks.

Each method is dispatched to all sinks concurrently. Errors from individual sinks are swallowed via return_exceptions=True so a failing sink never affects the others.

Source code in src/ant_ai/observer/composite.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class CompositeSink:
    """Fans out all observability calls to a list of sinks.

    Each method is dispatched to all sinks concurrently. Errors from
    individual sinks are swallowed via `return_exceptions=True` so a
    failing sink never affects the others.
    """

    def __init__(self, sinks: list[Any]) -> None:
        """
        Args:
            sinks: `ObservabilitySink` implementations to fan out to.
        """
        self.sinks: list[Any] = sinks

    async def event(self, name: str, **fields) -> None:
        """Fans out the event to all sinks concurrently."""
        await asyncio.gather(
            *(s.event(name, **fields) for s in self.sinks),
            return_exceptions=True,
        )

    async def exception(self, name: str, error: Exception, **fields) -> None:
        """Fans out the error event to all sinks concurrently."""
        await asyncio.gather(
            *(s.exception(name, error, **fields) for s in self.sinks),
            return_exceptions=True,
        )

    @asynccontextmanager
    async def span(self, name: str, **attrs):
        """Opens a span on all sinks concurrently and closes them on exit."""
        async with AsyncExitStack() as stack:
            for s in self.sinks:
                await stack.enter_async_context(s.span(name, **attrs))
            yield

    def propagation_headers(self) -> dict[str, str]:
        """Merges propagation headers from all sinks."""
        result: dict[str, str] = {}
        for s in self.sinks:
            result.update(s.propagation_headers())
        return result

    @contextmanager
    def attach_propagation_context(self, headers: dict[str, str]):
        """Restores propagation context in all sinks."""
        with ExitStack() as stack:
            for s in self.sinks:
                stack.enter_context(s.attach_propagation_context(headers))
            yield

__init__

__init__(sinks: list[Any]) -> None

Parameters:

Name Type Description Default
sinks list[Any]

ObservabilitySink implementations to fan out to.

required
Source code in src/ant_ai/observer/composite.py
16
17
18
19
20
21
def __init__(self, sinks: list[Any]) -> None:
    """
    Args:
        sinks: `ObservabilitySink` implementations to fan out to.
    """
    self.sinks: list[Any] = sinks

event async

event(name: str, **fields) -> None

Fans out the event to all sinks concurrently.

Source code in src/ant_ai/observer/composite.py
23
24
25
26
27
28
async def event(self, name: str, **fields) -> None:
    """Fans out the event to all sinks concurrently."""
    await asyncio.gather(
        *(s.event(name, **fields) for s in self.sinks),
        return_exceptions=True,
    )

exception async

exception(name: str, error: Exception, **fields) -> None

Fans out the error event to all sinks concurrently.

Source code in src/ant_ai/observer/composite.py
30
31
32
33
34
35
async def exception(self, name: str, error: Exception, **fields) -> None:
    """Fans out the error event to all sinks concurrently."""
    await asyncio.gather(
        *(s.exception(name, error, **fields) for s in self.sinks),
        return_exceptions=True,
    )

span async

span(name: str, **attrs)

Opens a span on all sinks concurrently and closes them on exit.

Source code in src/ant_ai/observer/composite.py
37
38
39
40
41
42
43
@asynccontextmanager
async def span(self, name: str, **attrs):
    """Opens a span on all sinks concurrently and closes them on exit."""
    async with AsyncExitStack() as stack:
        for s in self.sinks:
            await stack.enter_async_context(s.span(name, **attrs))
        yield

propagation_headers

propagation_headers() -> dict[str, str]

Merges propagation headers from all sinks.

Source code in src/ant_ai/observer/composite.py
45
46
47
48
49
50
def propagation_headers(self) -> dict[str, str]:
    """Merges propagation headers from all sinks."""
    result: dict[str, str] = {}
    for s in self.sinks:
        result.update(s.propagation_headers())
    return result

attach_propagation_context

attach_propagation_context(headers: dict[str, str])

Restores propagation context in all sinks.

Source code in src/ant_ai/observer/composite.py
52
53
54
55
56
57
58
@contextmanager
def attach_propagation_context(self, headers: dict[str, str]):
    """Restores propagation context in all sinks."""
    with ExitStack() as stack:
        for s in self.sinks:
            stack.enter_context(s.attach_propagation_context(headers))
        yield