Skip to content

ant_ai.agent.base

BaseAgent pydantic-model

Bases: BaseModel

Base class for all agent implementations.

Owns the public invocation and streaming APIs, hook chain, and identity fields. Subclasses implement _make_loop() to provide the reasoning loop.

Fields:

Source code in src/ant_ai/agent/base.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class BaseAgent(BaseModel):
    """
    Base class for all agent implementations.

    Owns the public invocation and streaming APIs, hook chain, and
    identity fields. Subclasses implement `_make_loop()` to provide the
    reasoning loop.
    """

    name: str = Field(
        default="BaseAgent",
        description="Display name used in routing and observability.",
    )
    system_prompt: str = Field(description="System instruction for the LLM.")
    llm: Annotated[ChatLLM, SkipValidation]
    description: str = Field(
        default="A base agent.",
        description="Human-readable description of the agent for documentation.",
    )
    tools: list[Tool] = Field(
        default_factory=list,
        description="Tools available to the agent.",
    )
    hooks: list[AgentHook] = Field(
        default_factory=list,
        description="Lifecycle hooks for the agent.",
    )
    max_retries: int = Field(
        default=3,
        ge=1,
        description="Maximum number of times to retry after a hook returns RETRY.",
    )

    _registry: ToolRegistry = PrivateAttr()
    _loop: BaseAgentLoop = PrivateAttr()
    _hook_layer: HookLayer = PrivateAttr()

    model_config = ConfigDict(arbitrary_types_allowed=True)

    @model_validator(mode="after")
    def _build(self) -> BaseAgent:
        self._registry = ToolRegistry(self.tools)
        self._hook_layer = HookLayer(hooks=self.hooks)
        self._loop = self._make_loop()
        return self

    @abstractmethod
    def _make_loop(self) -> BaseAgentLoop:
        """Create and return the reasoning loop for this agent."""
        ...

    @property
    def system_message(self) -> Message:
        """Return this agent's system message."""
        return Message(role="system", content=self.system_prompt)

    async def stream(
        self,
        state: State,
        *,
        max_steps: int = 10,
        ctx: InvocationContext | None = None,
        response_schema: type[BaseModel] | None = None,
    ) -> AsyncGenerator[Event]:
        """
        Stream events for a single agent turn.

        Pass `response_schema` to request structured output on the
        `FinalAnswerEvent`. When provided, the final event content will be
        valid JSON matching that schema.

        Args:
            state: Current agent state, including message history and other
                execution context.
            max_steps: Maximum number of loop steps before stopping.
            ctx: Invocation context for the current run.
            response_schema: Schema for structured output on the
                `FinalAnswerEvent`.

        Yields:
            Event: Events produced during execution, including intermediate
            updates and the final answer event.
        """
        with obs.bind(
            session_id=ctx.session_id if ctx else "",
            agent_name=self.name,
        ):
            await self._hook_layer.run_before_agent(state, ctx)
            try:
                async for event in self._loop.stream(
                    state,
                    ctx,
                    max_steps=max_steps,
                    response_schema=response_schema,
                ):
                    yield event
            finally:
                await self._hook_layer.run_after_agent(state, ctx)

    async def ainvoke(
        self,
        state: State,
        *,
        max_steps: int = 10,
        ctx: InvocationContext | None = None,
        response_schema: type[BaseModel] | None = None,
    ) -> str:
        """
        Run a single agent turn and return the final answer.

        This is the asynchronous counterpart to `invoke`. Internally, it
        consumes `stream()` and returns the content of the `FINAL_ANSWER`
        event.

        Pass `response_schema` to request structured output on the
        `FINAL_ANSWER` event. When provided, the returned string will contain
        valid JSON matching that schema.

        Args:
            state: Current agent state, including message history and other
                execution context.
            max_steps: Maximum number of loop steps before stopping.
            ctx: Invocation context for the current run.
            response_schema: Schema for structured output on the
                `FinalAnswerEvent`.

        Returns:
            The content of the `FinalAnswerEvent`.
        """
        final = ""
        async for event in self.stream(
            state,
            max_steps=max_steps,
            ctx=ctx,
            response_schema=response_schema,
        ):
            if isinstance(event, FinalAnswerEvent):
                final: str = event.content

        return final

    def invoke(
        self,
        state: State,
        *,
        max_steps: int = 10,
        ctx: InvocationContext | None = None,
        response_schema: type[BaseModel] | None = None,
    ) -> str:
        """
        Run a single agent turn and return the final answer.

        This is the synchronous counterpart to `ainvoke`. Internally, it runs
        the asynchronous invocation path and returns the content of the
        `FinalAnswerEvent`.

        Pass `response_schema` to request structured output on the
        `FinalAnswerEvent`. When provided, the returned string will contain
        valid JSON matching that schema.

        Args:
            state: Current agent state, including message history and other
                execution context.
            max_steps: Maximum number of loop steps before stopping.
            ctx: Invocation context for the current run.
            response_schema: Schema for structured output on the
                `FinalAnswerEvent`.

        Returns:
            The content of the `FinalAnswerEvent`.

        Raises:
            RuntimeError: If called from a thread with an active event loop.
        """
        try:
            asyncio.get_running_loop()
        except RuntimeError:
            return asyncio.run(
                self.ainvoke(
                    state,
                    max_steps=max_steps,
                    ctx=ctx,
                    response_schema=response_schema,
                )
            )
        else:
            raise RuntimeError(
                "invoke() cannot be called while an event loop is already running. "
                "Use `await ainvoke(...)` instead."
            )

    @abstractmethod
    def add_tool(self, tool: Tool) -> None:
        """Register a tool with the agent."""
        ...

    @property
    def registry(self) -> ToolRegistry:
        """Return the tool registry for this agent."""
        return self._registry

name pydantic-field

name: str = 'BaseAgent'

Display name used in routing and observability.

system_prompt pydantic-field

system_prompt: str

System instruction for the LLM.

description pydantic-field

description: str = 'A base agent.'

Human-readable description of the agent for documentation.

tools pydantic-field

tools: list[Tool]

Tools available to the agent.

hooks pydantic-field

hooks: list[AgentHook]

Lifecycle hooks for the agent.

max_retries pydantic-field

max_retries: int = 3

Maximum number of times to retry after a hook returns RETRY.

system_message property

system_message: Message

Return this agent's system message.

registry property

registry: ToolRegistry

Return the tool registry for this agent.

stream async

stream(
    state: State,
    *,
    max_steps: int = 10,
    ctx: InvocationContext | None = None,
    response_schema: type[BaseModel] | None = None,
) -> AsyncGenerator[Event]

Stream events for a single agent turn.

Pass response_schema to request structured output on the FinalAnswerEvent. When provided, the final event content will be valid JSON matching that schema.

Parameters:

Name Type Description Default
state State

Current agent state, including message history and other execution context.

required
max_steps int

Maximum number of loop steps before stopping.

10
ctx InvocationContext | None

Invocation context for the current run.

None
response_schema type[BaseModel] | None

Schema for structured output on the FinalAnswerEvent.

None

Yields:

Name Type Description
Event AsyncGenerator[Event]

Events produced during execution, including intermediate

AsyncGenerator[Event]

updates and the final answer event.

Source code in src/ant_ai/agent/base.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
async def stream(
    self,
    state: State,
    *,
    max_steps: int = 10,
    ctx: InvocationContext | None = None,
    response_schema: type[BaseModel] | None = None,
) -> AsyncGenerator[Event]:
    """
    Stream events for a single agent turn.

    Pass `response_schema` to request structured output on the
    `FinalAnswerEvent`. When provided, the final event content will be
    valid JSON matching that schema.

    Args:
        state: Current agent state, including message history and other
            execution context.
        max_steps: Maximum number of loop steps before stopping.
        ctx: Invocation context for the current run.
        response_schema: Schema for structured output on the
            `FinalAnswerEvent`.

    Yields:
        Event: Events produced during execution, including intermediate
        updates and the final answer event.
    """
    with obs.bind(
        session_id=ctx.session_id if ctx else "",
        agent_name=self.name,
    ):
        await self._hook_layer.run_before_agent(state, ctx)
        try:
            async for event in self._loop.stream(
                state,
                ctx,
                max_steps=max_steps,
                response_schema=response_schema,
            ):
                yield event
        finally:
            await self._hook_layer.run_after_agent(state, ctx)

ainvoke async

ainvoke(
    state: State,
    *,
    max_steps: int = 10,
    ctx: InvocationContext | None = None,
    response_schema: type[BaseModel] | None = None,
) -> str

Run a single agent turn and return the final answer.

This is the asynchronous counterpart to invoke. Internally, it consumes stream() and returns the content of the FINAL_ANSWER event.

Pass response_schema to request structured output on the FINAL_ANSWER event. When provided, the returned string will contain valid JSON matching that schema.

Parameters:

Name Type Description Default
state State

Current agent state, including message history and other execution context.

required
max_steps int

Maximum number of loop steps before stopping.

10
ctx InvocationContext | None

Invocation context for the current run.

None
response_schema type[BaseModel] | None

Schema for structured output on the FinalAnswerEvent.

None

Returns:

Type Description
str

The content of the FinalAnswerEvent.

Source code in src/ant_ai/agent/base.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
async def ainvoke(
    self,
    state: State,
    *,
    max_steps: int = 10,
    ctx: InvocationContext | None = None,
    response_schema: type[BaseModel] | None = None,
) -> str:
    """
    Run a single agent turn and return the final answer.

    This is the asynchronous counterpart to `invoke`. Internally, it
    consumes `stream()` and returns the content of the `FINAL_ANSWER`
    event.

    Pass `response_schema` to request structured output on the
    `FINAL_ANSWER` event. When provided, the returned string will contain
    valid JSON matching that schema.

    Args:
        state: Current agent state, including message history and other
            execution context.
        max_steps: Maximum number of loop steps before stopping.
        ctx: Invocation context for the current run.
        response_schema: Schema for structured output on the
            `FinalAnswerEvent`.

    Returns:
        The content of the `FinalAnswerEvent`.
    """
    final = ""
    async for event in self.stream(
        state,
        max_steps=max_steps,
        ctx=ctx,
        response_schema=response_schema,
    ):
        if isinstance(event, FinalAnswerEvent):
            final: str = event.content

    return final

invoke

invoke(
    state: State,
    *,
    max_steps: int = 10,
    ctx: InvocationContext | None = None,
    response_schema: type[BaseModel] | None = None,
) -> str

Run a single agent turn and return the final answer.

This is the synchronous counterpart to ainvoke. Internally, it runs the asynchronous invocation path and returns the content of the FinalAnswerEvent.

Pass response_schema to request structured output on the FinalAnswerEvent. When provided, the returned string will contain valid JSON matching that schema.

Parameters:

Name Type Description Default
state State

Current agent state, including message history and other execution context.

required
max_steps int

Maximum number of loop steps before stopping.

10
ctx InvocationContext | None

Invocation context for the current run.

None
response_schema type[BaseModel] | None

Schema for structured output on the FinalAnswerEvent.

None

Returns:

Type Description
str

The content of the FinalAnswerEvent.

Raises:

Type Description
RuntimeError

If called from a thread with an active event loop.

Source code in src/ant_ai/agent/base.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
def invoke(
    self,
    state: State,
    *,
    max_steps: int = 10,
    ctx: InvocationContext | None = None,
    response_schema: type[BaseModel] | None = None,
) -> str:
    """
    Run a single agent turn and return the final answer.

    This is the synchronous counterpart to `ainvoke`. Internally, it runs
    the asynchronous invocation path and returns the content of the
    `FinalAnswerEvent`.

    Pass `response_schema` to request structured output on the
    `FinalAnswerEvent`. When provided, the returned string will contain
    valid JSON matching that schema.

    Args:
        state: Current agent state, including message history and other
            execution context.
        max_steps: Maximum number of loop steps before stopping.
        ctx: Invocation context for the current run.
        response_schema: Schema for structured output on the
            `FinalAnswerEvent`.

    Returns:
        The content of the `FinalAnswerEvent`.

    Raises:
        RuntimeError: If called from a thread with an active event loop.
    """
    try:
        asyncio.get_running_loop()
    except RuntimeError:
        return asyncio.run(
            self.ainvoke(
                state,
                max_steps=max_steps,
                ctx=ctx,
                response_schema=response_schema,
            )
        )
    else:
        raise RuntimeError(
            "invoke() cannot be called while an event loop is already running. "
            "Use `await ainvoke(...)` instead."
        )

add_tool abstractmethod

add_tool(tool: Tool) -> None

Register a tool with the agent.

Source code in src/ant_ai/agent/base.py
220
221
222
223
@abstractmethod
def add_tool(self, tool: Tool) -> None:
    """Register a tool with the agent."""
    ...