Skip to content

Architecture Overview

Components

Module Component Responsibility
ant_ai.agent Agent Core reasoning unit. Runs the ReAct loop: queries the LLM, executes tool calls, and streams events until a final answer is reached.
ant_ai.workflow Workflow Directed graph of nodes (actions) connected by static edges or conditional routers. Orchestrates what the agent does and in what order.
ant_ai.tools Tool Callables exposed to the LLM via JSON schema. Defined with the @tool decorator or as a Tool subclass for grouped namespaces.
ToolRegistry Built automatically from the agent's tool list. Expands namespace tools into individually callable entries.
ant_ai.a2a Colony Multi-agent coordinator. Registers agents with their workflows and A2A cards, wires collaboration edges, and produces ASGI apps for deployment.
A2AExecutor ASGI request handler. Receives incoming A2A requests, initialises InvocationContext and State, drives Workflow.stream(), and translates events to A2A task updates.
A2AAgentTool A Tool that calls a remote agent over HTTP. Added to source agents automatically by Colony.collab().
ant_ai.core AgentEvent / WorkflowEvent Typed events emitted by the agent and workflow respectively. All progress — LLM output, tool calls, node transitions — is observable through this stream.

A high level view of the interaction of these components is shown below:

---
title: Component relationships
---
flowchart TD
    Client(["Client"])
    Remote(["Remote agent"])

    subgraph ext_llm["LLM provider"]
        LLM["LiteLLM / OpenAI"]
    end

    subgraph hm["ant_ai"]
        Colony["Colony"]
        A2AS["A2AServer"]
        Exec["A2AExecutor"]
        Agent["Agent"]

        subgraph reg["ToolRegistry"]
            Tool["Tool"]
            A2AT["A2AAgentTool"]
        end

        subgraph wf["Workflow"]
            direction LR
            S(("START")) --> N1["Node"]
            N1 -->|router| N2["Node"]
            N1 -->|router| N3["Node"]
            N2 --> E(("END"))
            N3 --> E
        end

        Colony -.->|"wires & deploys"| A2AS
        Colony -.->|"wires & deploys"| Agent
        A2AS --> Exec
        Exec --> wf
        wf --> Agent
        Agent --> reg
    end

    subgraph ext_a2a["a2a SDK"]
        SDK["RequestHandler"]
    end

    Client -->|"A2A request"| A2AS
    Exec -.->|"extends"| SDK
    Agent -.->|"LLM calls"| LLM
    A2AT -->|"A2A call"| Remote
    Exec -->|"streamed updates"| Client
Hold "Alt" / "Option" to enable pan & zoom

Flow of a Request

In the A2A integration, A2AExecutor.execute() acts as the streaming entrypoint: it ensures there is an active Task, builds an InvocationContext and initial State (including converted history), then consumes the asynchronous event stream produced by Workflow.stream(). The workflow is a directed graph of actions (nodes) connected by static edges or conditional routers; it opens with a StartEvent, wraps each node in a pair of UpdateEvents (node started / node completed), and closes with a CompletedEvent. Within each node the Agent runs its ReAct loop, emitting ReasoningEvent (extended-thinking models only), ToolCallingEvent / ToolResultEvent for each tool round-trip, and finally a FinalAnswerEvent when the LLM produces a plain-text response. Every event is translated via HVEventToA2A.apply() and applied to the task through TaskUpdater, streaming incremental updates back to the client until the workflow reaches a terminal state.

---
title: Architecture (event streaming)
---
sequenceDiagram
    autonumber
    participant Client as User/Agent
    participant Exec as A2AExecutor
    participant WF as Workflow
    participant Act as Action
    participant Agent as Agent
    participant LLM as LLM

    Client->>Exec: Send request
    Exec->>WF: Start workflow stream

        WF-->>Exec: StartEvent
        Exec-->>Client: Stream update

        loop For each action/node (many)
            WF-->>Exec: UpdateEvent (node started)
            Exec-->>Client: Stream update

            WF->>Act: Run action
            Act->>Agent: Run agent logic

            loop ReAct iterations (many)
                Agent->>LLM: Query LLM
                LLM-->>Agent: Response

                opt Reasoning (extended thinking models)
                    Agent-->>Exec: ReasoningEvent
                    Exec-->>Client: Stream update
                end

                alt Tool calls requested
                    Agent-->>Exec: ToolCallingEvent
                    Exec-->>Client: Stream update
                    Agent-->>Exec: ToolResultEvent (per tool)
                    Exec-->>Client: Stream update
                else No tool calls
                    Agent-->>Exec: FinalAnswerEvent
                    Exec-->>Client: Stream update
                end
            end

            WF-->>Exec: UpdateEvent (node completed)
            Exec-->>Client: Stream update

            WF->>WF: Select next action (edge/router)
        end

        WF-->>Exec: CompletedEvent
        Exec-->>Client: Stream update
Hold "Alt" / "Option" to enable pan & zoom

Events

Every step of agent and workflow execution is observable through a stream of typed events. All events extend Event.

Workflow events

Emitted by Workflow to signal lifecycle transitions and node progress.

Class kind When emitted
StartEvent "start" Workflow begins execution
UpdateEvent "update" Node started ("Starting action 'X'") or completed ("Completed action 'X'")
CompletedEvent "completed" Workflow finished successfully; content holds the final answer

Agent events

Emitted by the agent's ReAct loop during LLM calls and tool execution.

Class kind When emitted
ReasoningEvent "reasoning" Model produced extended thinking / reasoning text (before the answer)
ToolCallingEvent "tool_calling" LLM response contains tool calls; message holds the ToolCallMessage
ToolResultEvent "tool_result" One tool call completed; content holds the result
FinalAnswerEvent "final_answer" LLM produced a plain-text response (no tool calls); ends the ReAct loop
MaxStepsReachedEvent "max_steps_reached" Agent exhausted its maximum allowed ReAct steps
ClarificationNeededEvent "input_required" Agent determined it needs human input to proceed

Consuming events

from ant_ai.core.events import (
    StartEvent, UpdateEvent, CompletedEvent,
    ReasoningEvent, ToolCallingEvent, ToolResultEvent,
    FinalAnswerEvent, MaxStepsReachedEvent,
)

async for event in workflow.stream(agent, ctx=ctx, state=state):
    match event:
        case StartEvent():
            print("workflow started")
        case UpdateEvent():
            print("node update:", event.content, f"[{event.origin.node}]")
        case ReasoningEvent():
            print("thinking:", event.content)
        case ToolCallingEvent():
            print("calling tools:", event.message)
        case ToolResultEvent():
            print("tool result:", event.content)
        case FinalAnswerEvent():
            print("answer:", event.content)
        case CompletedEvent():
            print("workflow done:", event.content)