Skip to content

ant_ai.agent.loop.react

ToolRequest dataclass

LLM produced tool calls and act_step is configured — run the tools.

Source code in src/ant_ai/agent/loop/react.py
29
30
31
@dataclass(frozen=True)
class ToolRequest:
    """LLM produced tool calls and act_step is configured — run the tools."""

FinalResponse dataclass

LLM produced a final text answer, end the loop.

Source code in src/ant_ai/agent/loop/react.py
34
35
36
@dataclass(frozen=True)
class FinalResponse:
    """LLM produced a final text answer, end the loop."""

ReActLoop pydantic-model

Bases: BaseAgentLoop

Runs the default ReAct loop.

Config:

  • arbitrary_types_allowed: True

Fields:

Source code in src/ant_ai/agent/loop/react.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
class ReActLoop(BaseAgentLoop):
    """
    Runs the default ReAct loop.
    """

    reason_step: LLMStep
    act_step: ToolStep | None = None

    async def stream(
        self,
        state: State,
        ctx: InvocationContext | None,
        *,
        max_steps: int = 10,
        response_schema: type[BaseModel] | None = None,
    ) -> AsyncIterator[Event]:
        active_step: LLMStep = (
            self.reason_step.model_copy(update={"response_format": response_schema})
            if self.act_step is None and response_schema
            else self.reason_step
        )
        coerce_schema: type[BaseModel] | None = (
            response_schema if self.act_step is not None else None
        )

        for loop_step in range(1, max_steps + 1):
            llm_result: StepResult | None = None

            await self.hooks.run_before_model(state, ctx)

            async for item in self._run_model_with_hooks(active_step, state, ctx):
                if isinstance(item, StepResult):
                    llm_result: StepResult = item
                elif not isinstance(item, FinalAnswerEvent):
                    # LLMStep emits a FinalAnswerEvent before its StepResult as a side-effect.
                    # We emit the real FinalAnswerEvent below after optional schema handling.
                    yield item

            if llm_result is None:
                raise RuntimeError("LLM step produced no result")
            if not isinstance(llm_result.output, LLMOutput):
                raise TypeError(
                    f"Expected LLMOutput, got {type(llm_result.output).__name__}"
                )

            match self._classify_llm_result(llm_result):
                case ToolRequest():
                    if self.act_step is None:
                        raise RuntimeError(
                            "LLM requested tool calls but no tools are configured on this agent."
                        )
                    state.add_message(
                        ToolCallMessage(tool_calls=list(llm_result.output.tool_calls))
                    )
                    wrapped_tools: WrapCall = self.hooks.wrap_tool_call(
                        self.act_step.run
                    )
                    act_result: StepResult | None = None
                    async for item in self._observe_step(
                        self.act_step, wrapped_tools(state, ctx)
                    ):
                        if isinstance(item, StepResult):
                            act_result = item
                        else:
                            yield item
                    if act_result is None:
                        raise RuntimeError("Tool step produced no result")
                    if isinstance(act_result.output, ClarificationNeededOutput):
                        return
                    if not isinstance(act_result.output, ToolOutput):
                        raise TypeError(
                            f"Expected ToolOutput, got {type(act_result.output).__name__}"
                        )
                    for r in act_result.output.results:
                        state.add_message(
                            ToolCallResultMessage(
                                name=r["name"],
                                tool_call_id=r["tool_call_id"],
                                content=r["content"],
                            )
                        )

                case FinalResponse():
                    final_event = await self._make_final_answer(
                        llm_result.output.raw, loop_step, coerce_schema, ctx
                    )
                    state.add_message(
                        Message(role="assistant", content=final_event.content)
                    )
                    yield final_event
                    return

        yield MaxStepsReachedEvent(
            origin=EventOrigin(layer="agent", run_step=max_steps),
        )

    def _classify_llm_result(self, result: StepResult) -> ToolRequest | FinalResponse:
        """Classify what the loop should do next based on the LLM step result."""
        if not isinstance(result.output, LLMOutput):
            raise TypeError(f"Expected LLMOutput, got {type(result.output).__name__}")
        has_tool_calls: bool = result.output.has_tool_calls
        is_continue: bool = result.transition.action != TransitionAction.END
        if is_continue and has_tool_calls:
            if self.act_step is None:
                raise RuntimeError(
                    "LLM requested tool calls but no tools are configured on this agent."
                )
            return ToolRequest()
        return FinalResponse()

    async def _make_final_answer(
        self,
        text: str,
        loop_step: int,
        response_schema: type[BaseModel] | None,
        ctx: InvocationContext | None,
    ) -> FinalAnswerEvent:
        final_text: str = text
        if response_schema is not None:
            final_text = await self._coerce_to_schema(text, response_schema, ctx)

        return FinalAnswerEvent(
            origin=EventOrigin(layer="agent", run_step=loop_step),
            content=final_text,
        )

    async def _coerce_to_schema(
        self,
        text: str,
        schema: type[BaseModel],
        ctx: InvocationContext | None,
    ) -> str:
        """
        Return valid JSON matching schema.

        Try direct validation first — if the LLM already produced valid JSON,
        return it as-is. Only call _structure() as a repair pass on failure.
        """
        try:
            schema.model_validate_json(text)
            return text
        except Exception:
            return await self._structure(text, schema, ctx)

    async def _structure(
        self,
        text: str,
        schema: type[BaseModel],
        ctx: InvocationContext | None,
    ) -> str:
        """
        One extra LLM call that converts free text into JSON matching schema.

        Uses response_format so constrained decoding guarantees valid output —
        no retry loop, no hook needed.
        """
        structuring_step = LLMStep(
            llm=self.reason_step.llm,
            system_message=Message(
                role="system",
                content=(
                    "Convert the following text into a JSON object. "
                    "Respond with valid JSON only, no explanation."
                ),
            ),
            serialized_tools=[],
            response_format=schema,
        )
        structuring_state = State()
        structuring_state.add_message(Message(role="user", content=text))

        _, result = await self._consume_wrapped(
            structuring_step,
            structuring_step.run,
            structuring_state,
            ctx,
        )
        if not isinstance(result.output, LLMOutput):
            raise TypeError(
                f"Expected LLMOutput from structuring step, got {type(result.output).__name__}"
            )
        return result.output.raw

    def register_tool(self, registry: ToolRegistry) -> None:
        """Update internal steps to reflect a newly registered tool in registry."""
        self.reason_step.serialized_tools = registry.to_serialized()
        if self.act_step is None:
            self.act_step = ToolStep(registry=registry)

register_tool

register_tool(registry: ToolRegistry) -> None

Update internal steps to reflect a newly registered tool in registry.

Source code in src/ant_ai/agent/loop/react.py
222
223
224
225
226
def register_tool(self, registry: ToolRegistry) -> None:
    """Update internal steps to reflect a newly registered tool in registry."""
    self.reason_step.serialized_tools = registry.to_serialized()
    if self.act_step is None:
        self.act_step = ToolStep(registry=registry)