Skip to content

ant_ai.a2a.client

A2AClient pydantic-model

Bases: BaseModel

Client for interacting with an agent via the A2A protocol. Encapsulates connection management, message sending, and response handling.

Show JSON schema:
{
  "$defs": {
    "A2AConfig": {
      "description": "Configuration used to set the connection with the A2A server.",
      "properties": {
        "endpoint": {
          "description": "The URL of the A2A server to connect to.",
          "title": "Endpoint",
          "type": "string"
        },
        "timeout": {
          "$ref": "#/$defs/TimeoutTypes",
          "description": "Timeout configuration for the A2A client. Can be a float (total timeout) or a dict with connect/read/write/pool timeouts. See httpx.Timeout for more details."
        },
        "agent_card_path": {
          "default": "/.well-known/agent-card.json",
          "description": "The path, on the remote url, to the agent card.",
          "title": "Agent Card Path",
          "type": "string"
        },
        "supported_protocol_bindings": {
          "default": [
            "JSONRPC",
            "HTTP+JSON"
          ],
          "description": "The supported A2A protocol bindings.",
          "items": {
            "type": "string"
          },
          "title": "Supported Protocol Bindings",
          "type": "array"
        },
        "streaming": {
          "default": true,
          "description": "Whether to enable streaming.",
          "title": "Streaming",
          "type": "boolean"
        },
        "propagate_trace_context": {
          "default": true,
          "description": "Whether to inject the current trace context into outbound A2A requests. Set to False when calling third-party agents you do not own.",
          "title": "Propagate Trace Context",
          "type": "boolean"
        }
      },
      "required": [
        "endpoint"
      ],
      "title": "A2AConfig",
      "type": "object"
    },
    "TimeoutTypes": {
      "anyOf": [
        {
          "type": "number"
        },
        {
          "maxItems": 4,
          "minItems": 4,
          "prefixItems": [
            {
              "anyOf": [
                {
                  "type": "number"
                },
                {
                  "type": "null"
                }
              ]
            },
            {
              "anyOf": [
                {
                  "type": "number"
                },
                {
                  "type": "null"
                }
              ]
            },
            {
              "anyOf": [
                {
                  "type": "number"
                },
                {
                  "type": "null"
                }
              ]
            },
            {
              "anyOf": [
                {
                  "type": "number"
                },
                {
                  "type": "null"
                }
              ]
            }
          ],
          "type": "array"
        },
        {
          "type": "null"
        }
      ]
    }
  },
  "description": "Client for interacting with an agent via the A2A protocol. Encapsulates connection management, message sending, and response handling.",
  "properties": {
    "config": {
      "$ref": "#/$defs/A2AConfig",
      "description": "Configuration for the A2A client, including endpoint, timeouts, and supported transports."
    }
  },
  "required": [
    "config"
  ],
  "title": "A2AClient",
  "type": "object"
}

Fields:

Source code in src/ant_ai/a2a/client.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
class A2AClient(BaseModel):
    """
    Client for interacting with an agent via the A2A protocol. Encapsulates connection management, message sending, and response handling.
    """

    config: A2AConfig = Field(
        description="Configuration for the A2A client, including endpoint, timeouts, and supported transports."
    )

    _agent_card: AgentCard | None = PrivateAttr(default=None)
    _httpx: AsyncClient | None = PrivateAttr(default=None)
    _client: Client | None = PrivateAttr(default=None)
    _init_lock: asyncio.Lock = PrivateAttr(default_factory=asyncio.Lock)
    _translator: A2AToHVEvent = PrivateAttr(default_factory=A2AToHVEvent)

    async def __aenter__(self) -> A2AClient:
        return self

    async def __aexit__(self, exc_type, exc, tb) -> None:
        await self.aclose()

    async def aclose(self) -> None:
        httpx_client: AsyncClient | None = self._httpx
        self._httpx = None
        self._client = None
        self._agent_card = None
        if httpx_client is not None:
            await httpx_client.aclose()

    async def get_agent_card(self) -> AgentCard:
        """Fetch the AgentCard from the remote.

        Returns:
            An AgentCard object representing the remote agent's information.
        """
        await self._ensure_client()
        if self._agent_card is None:
            raise RuntimeError("Agent card not initialized after _ensure_client()")
        return self._agent_card

    async def _ensure_client(self) -> Client:
        if self._client is not None and self._httpx is not None:
            return self._client

        async with self._init_lock:
            if self._client is not None and self._httpx is not None:
                return self._client

            httpx_client = AsyncClient(timeout=self.config.timeout)
            try:
                client: Client = await self._build_a2a_client(httpx_client)
            except Exception:
                await httpx_client.aclose()
                raise

            self._httpx: AsyncClient = httpx_client
            self._client: Client = client
            return client

    async def _build_a2a_client(self, httpx_client: AsyncClient) -> Client:
        if self._agent_card is None:
            resolver = A2ACardResolver(
                httpx_client=httpx_client,
                base_url=self.config.endpoint,
                agent_card_path=self.config.agent_card_path,
            )
            try:
                self._agent_card: AgentCard = await resolver.get_agent_card()
            except (HTTPError, Exception) as e:
                raise AgentClientError(f"Failed to resolve AgentCard: {e}") from e

        cfg = ClientConfig(
            httpx_client=httpx_client,
            supported_protocol_bindings=list(self.config.supported_protocol_bindings),
            streaming=self.config.streaming,
        )
        return await create_client(self._agent_card, client_config=cfg)

    async def send_message(
        self,
        message: str,
        *,
        request_metadata: dict | None = None,
        context_id: str | None = None,
        reference_task_ids: list[str] | None = None,
    ) -> AsyncIterator[Event]:
        """Sends a message to the agent and yields events as responses are received.

        Args:
            message: The message to send to the agent.
            request_metadata: Optional metadata to include with the request.
            context_id: Optional context ID for the message. If not provided, a new UUID will be generated.
            reference_task_ids: Optional list of task IDs that this message references, for better traceability in task management.

        Yields:
            An event representing a response from the agent, translated from the raw A2A response.
        """

        client: Client = await self._ensure_client()

        msg = Message(
            role=Role.ROLE_USER,
            parts=[Part(text=message)],
            message_id=str(uuid4()),
            context_id=context_id or str(uuid4()),
        )
        if reference_task_ids:
            msg.reference_task_ids.extend(reference_task_ids)

        request = SendMessageRequest(message=msg)

        if self.config.propagate_trace_context:
            request.metadata.update(obs.propagation_headers())
        if request_metadata:
            request.metadata.update(request_metadata)

        try:
            async for chunk in client.send_message(request):
                if chunk.HasField("status_update"):
                    raw: TaskStatusUpdateEvent = chunk.status_update
                elif chunk.HasField("message"):
                    raw: Message = chunk.message
                elif chunk.HasField("task"):
                    raw: Task = chunk.task
                elif chunk.HasField("artifact_update"):
                    raw: TaskArtifactUpdateEvent = chunk.artifact_update
                else:
                    continue
                ev: Event | None = self._translator.translate(raw)
                if ev is not None:
                    yield ev

        except TimeoutException as e:
            raise AgentClientError(f"Agent request timed out: {e}") from e
        except HTTPError as e:
            raise AgentClientError(f"Agent HTTP error: {e}") from e
        except Exception as e:
            raise AgentClientError(f"Agent client error: {e}") from e

config pydantic-field

config: A2AConfig

Configuration for the A2A client, including endpoint, timeouts, and supported transports.

get_agent_card async

get_agent_card() -> AgentCard

Fetch the AgentCard from the remote.

Returns:

Type Description
AgentCard

An AgentCard object representing the remote agent's information.

Source code in src/ant_ai/a2a/client.py
58
59
60
61
62
63
64
65
66
67
async def get_agent_card(self) -> AgentCard:
    """Fetch the AgentCard from the remote.

    Returns:
        An AgentCard object representing the remote agent's information.
    """
    await self._ensure_client()
    if self._agent_card is None:
        raise RuntimeError("Agent card not initialized after _ensure_client()")
    return self._agent_card

send_message async

send_message(
    message: str,
    *,
    request_metadata: dict | None = None,
    context_id: str | None = None,
    reference_task_ids: list[str] | None = None,
) -> AsyncIterator[Event]

Sends a message to the agent and yields events as responses are received.

Parameters:

Name Type Description Default
message str

The message to send to the agent.

required
request_metadata dict | None

Optional metadata to include with the request.

None
context_id str | None

Optional context ID for the message. If not provided, a new UUID will be generated.

None
reference_task_ids list[str] | None

Optional list of task IDs that this message references, for better traceability in task management.

None

Yields:

Type Description
AsyncIterator[Event]

An event representing a response from the agent, translated from the raw A2A response.

Source code in src/ant_ai/a2a/client.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
async def send_message(
    self,
    message: str,
    *,
    request_metadata: dict | None = None,
    context_id: str | None = None,
    reference_task_ids: list[str] | None = None,
) -> AsyncIterator[Event]:
    """Sends a message to the agent and yields events as responses are received.

    Args:
        message: The message to send to the agent.
        request_metadata: Optional metadata to include with the request.
        context_id: Optional context ID for the message. If not provided, a new UUID will be generated.
        reference_task_ids: Optional list of task IDs that this message references, for better traceability in task management.

    Yields:
        An event representing a response from the agent, translated from the raw A2A response.
    """

    client: Client = await self._ensure_client()

    msg = Message(
        role=Role.ROLE_USER,
        parts=[Part(text=message)],
        message_id=str(uuid4()),
        context_id=context_id or str(uuid4()),
    )
    if reference_task_ids:
        msg.reference_task_ids.extend(reference_task_ids)

    request = SendMessageRequest(message=msg)

    if self.config.propagate_trace_context:
        request.metadata.update(obs.propagation_headers())
    if request_metadata:
        request.metadata.update(request_metadata)

    try:
        async for chunk in client.send_message(request):
            if chunk.HasField("status_update"):
                raw: TaskStatusUpdateEvent = chunk.status_update
            elif chunk.HasField("message"):
                raw: Message = chunk.message
            elif chunk.HasField("task"):
                raw: Task = chunk.task
            elif chunk.HasField("artifact_update"):
                raw: TaskArtifactUpdateEvent = chunk.artifact_update
            else:
                continue
            ev: Event | None = self._translator.translate(raw)
            if ev is not None:
                yield ev

    except TimeoutException as e:
        raise AgentClientError(f"Agent request timed out: {e}") from e
    except HTTPError as e:
        raise AgentClientError(f"Agent HTTP error: {e}") from e
    except Exception as e:
        raise AgentClientError(f"Agent client error: {e}") from e