Skip to content

ant_ai.a2a.context_builder

HistoryRequestContextBuilder

Bases: RequestContextBuilder

Source code in src/ant_ai/a2a/context_builder.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class HistoryRequestContextBuilder(RequestContextBuilder):
    def __init__(
        self,
        should_populate_referred_tasks: bool = True,
        task_store: TaskStore | None = None,
        task_id_generator: IDGenerator | None = None,
        context_id_generator: IDGenerator | None = None,
    ) -> None:
        """Initializes the HistoryRequestContextBuilder.

        Args:
            should_populate_referred_tasks: If True (should always be True), the builder will fetch tasks
                referenced in `params.message.reference_task_ids` and populate the
                `related_tasks` field in the RequestContext. Kept to match interface of SimpleRequestContextBuilder. Defaults to True.
            task_store: The TaskStore instance to use for fetching referred tasks.
                Required if `should_populate_referred_tasks` is True.
            task_id_generator: ID generator for new task IDs. Defaults to None.
            context_id_generator: ID generator for new context IDs. Defaults to None.
        """
        self._task_store: TaskStore | None = task_store
        self._should_populate_referred_tasks: bool = should_populate_referred_tasks
        self._task_id_generator: IDGenerator | None = task_id_generator
        self._context_id_generator: IDGenerator | None = context_id_generator

    @override
    async def build(
        self,
        context: ServerCallContext,
        params: SendMessageRequest | None = None,
        task_id: str | None = None,
        context_id: str | None = None,
        task: Task | None = None,
    ) -> RequestContext:
        """Builds the request context for an agent execution.
        This method assembles the RequestContext object.

        Args:
            params: The parameters of the incoming message send request.
            task_id: The ID of the task being executed.
            context_id: The ID of the current execution context.
            task: The primary task object associated with the request.
            context: The server call context, containing metadata about the call.

        Returns:
            An instance of RequestContext populated with the provided information
            and potentially a list of related tasks.
        """

        related_tasks: list[Task] | None = None

        if (
            self._task_store
            and self._should_populate_referred_tasks
            and params
            and params.message.reference_task_ids
        ):
            related_tasks: list[Task] = await self.collect_all_referenced_tasks(
                context, self._task_store, params.message.reference_task_ids
            )

        return RequestContext(
            call_context=context,
            request=params,
            task_id=task_id,
            context_id=context_id,
            task=task,
            related_tasks=related_tasks,
            task_id_generator=self._task_id_generator,
            context_id_generator=self._context_id_generator,
        )

    async def collect_all_referenced_tasks(
        self,
        context: ServerCallContext,
        task_store: TaskStore,
        initial_ids: _containers.RepeatedScalarFieldContainer[str],
        visited: set[str] | None = None,
    ) -> list[Task]:
        """
        Recursively collects all tasks referenced by the given list of task IDs.
        This method performs a breadth-first search through the task graph, starting from the initial task IDs,
        and collects all tasks that are directly or indirectly referenced.

        Args:
            context: The server call context containing user information and other relevant
            task_store: The task store to retrieve tasks from given their IDs.
            initial_ids: The list of initial task IDs to start the search from.
            visited: A set of task IDs that have already been visited during the search to avoid cycles.
                This is used internally during the recursion and should not be provided by the caller.

        Returns:
            A list of Task objects that are referenced by the initial task IDs, including the tasks corresponding to the initial IDs themselves.
        """

        visited: set[str] = set() if visited is None else visited

        tasks: list[Task] = []
        queue: list[str] = [tid for tid in initial_ids if tid not in visited]

        while queue:
            batch: list[str] = list(dict.fromkeys(queue))
            queue: list[str] = []

            results: list[Task | None] = await asyncio.gather(
                *(task_store.get(tid, context=context) for tid in batch),
            )

            for item in results:
                if not isinstance(item, Task):
                    continue

                if item.id in visited:
                    continue

                visited.add(item.id)
                tasks.append(item)

                for msg in item.history or ():
                    queue.extend(msg.reference_task_ids or ())

            queue: list[str] = [tid for tid in queue if tid not in visited]

        return tasks[::-1]  # reverse to maintain original order

__init__

__init__(
    should_populate_referred_tasks: bool = True,
    task_store: TaskStore | None = None,
    task_id_generator: IDGenerator | None = None,
    context_id_generator: IDGenerator | None = None,
) -> None

Initializes the HistoryRequestContextBuilder.

Parameters:

Name Type Description Default
should_populate_referred_tasks bool

If True (should always be True), the builder will fetch tasks referenced in params.message.reference_task_ids and populate the related_tasks field in the RequestContext. Kept to match interface of SimpleRequestContextBuilder. Defaults to True.

True
task_store TaskStore | None

The TaskStore instance to use for fetching referred tasks. Required if should_populate_referred_tasks is True.

None
task_id_generator IDGenerator | None

ID generator for new task IDs. Defaults to None.

None
context_id_generator IDGenerator | None

ID generator for new context IDs. Defaults to None.

None
Source code in src/ant_ai/a2a/context_builder.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
def __init__(
    self,
    should_populate_referred_tasks: bool = True,
    task_store: TaskStore | None = None,
    task_id_generator: IDGenerator | None = None,
    context_id_generator: IDGenerator | None = None,
) -> None:
    """Initializes the HistoryRequestContextBuilder.

    Args:
        should_populate_referred_tasks: If True (should always be True), the builder will fetch tasks
            referenced in `params.message.reference_task_ids` and populate the
            `related_tasks` field in the RequestContext. Kept to match interface of SimpleRequestContextBuilder. Defaults to True.
        task_store: The TaskStore instance to use for fetching referred tasks.
            Required if `should_populate_referred_tasks` is True.
        task_id_generator: ID generator for new task IDs. Defaults to None.
        context_id_generator: ID generator for new context IDs. Defaults to None.
    """
    self._task_store: TaskStore | None = task_store
    self._should_populate_referred_tasks: bool = should_populate_referred_tasks
    self._task_id_generator: IDGenerator | None = task_id_generator
    self._context_id_generator: IDGenerator | None = context_id_generator

build async

build(
    context: ServerCallContext,
    params: SendMessageRequest | None = None,
    task_id: str | None = None,
    context_id: str | None = None,
    task: Task | None = None,
) -> RequestContext

Builds the request context for an agent execution. This method assembles the RequestContext object.

Parameters:

Name Type Description Default
params SendMessageRequest | None

The parameters of the incoming message send request.

None
task_id str | None

The ID of the task being executed.

None
context_id str | None

The ID of the current execution context.

None
task Task | None

The primary task object associated with the request.

None
context ServerCallContext

The server call context, containing metadata about the call.

required

Returns:

Type Description
RequestContext

An instance of RequestContext populated with the provided information

RequestContext

and potentially a list of related tasks.

Source code in src/ant_ai/a2a/context_builder.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@override
async def build(
    self,
    context: ServerCallContext,
    params: SendMessageRequest | None = None,
    task_id: str | None = None,
    context_id: str | None = None,
    task: Task | None = None,
) -> RequestContext:
    """Builds the request context for an agent execution.
    This method assembles the RequestContext object.

    Args:
        params: The parameters of the incoming message send request.
        task_id: The ID of the task being executed.
        context_id: The ID of the current execution context.
        task: The primary task object associated with the request.
        context: The server call context, containing metadata about the call.

    Returns:
        An instance of RequestContext populated with the provided information
        and potentially a list of related tasks.
    """

    related_tasks: list[Task] | None = None

    if (
        self._task_store
        and self._should_populate_referred_tasks
        and params
        and params.message.reference_task_ids
    ):
        related_tasks: list[Task] = await self.collect_all_referenced_tasks(
            context, self._task_store, params.message.reference_task_ids
        )

    return RequestContext(
        call_context=context,
        request=params,
        task_id=task_id,
        context_id=context_id,
        task=task,
        related_tasks=related_tasks,
        task_id_generator=self._task_id_generator,
        context_id_generator=self._context_id_generator,
    )

collect_all_referenced_tasks async

collect_all_referenced_tasks(
    context: ServerCallContext,
    task_store: TaskStore,
    initial_ids: RepeatedScalarFieldContainer[str],
    visited: set[str] | None = None,
) -> list[Task]

Recursively collects all tasks referenced by the given list of task IDs. This method performs a breadth-first search through the task graph, starting from the initial task IDs, and collects all tasks that are directly or indirectly referenced.

Parameters:

Name Type Description Default
context ServerCallContext

The server call context containing user information and other relevant

required
task_store TaskStore

The task store to retrieve tasks from given their IDs.

required
initial_ids RepeatedScalarFieldContainer[str]

The list of initial task IDs to start the search from.

required
visited set[str] | None

A set of task IDs that have already been visited during the search to avoid cycles. This is used internally during the recursion and should not be provided by the caller.

None

Returns:

Type Description
list[Task]

A list of Task objects that are referenced by the initial task IDs, including the tasks corresponding to the initial IDs themselves.

Source code in src/ant_ai/a2a/context_builder.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
async def collect_all_referenced_tasks(
    self,
    context: ServerCallContext,
    task_store: TaskStore,
    initial_ids: _containers.RepeatedScalarFieldContainer[str],
    visited: set[str] | None = None,
) -> list[Task]:
    """
    Recursively collects all tasks referenced by the given list of task IDs.
    This method performs a breadth-first search through the task graph, starting from the initial task IDs,
    and collects all tasks that are directly or indirectly referenced.

    Args:
        context: The server call context containing user information and other relevant
        task_store: The task store to retrieve tasks from given their IDs.
        initial_ids: The list of initial task IDs to start the search from.
        visited: A set of task IDs that have already been visited during the search to avoid cycles.
            This is used internally during the recursion and should not be provided by the caller.

    Returns:
        A list of Task objects that are referenced by the initial task IDs, including the tasks corresponding to the initial IDs themselves.
    """

    visited: set[str] = set() if visited is None else visited

    tasks: list[Task] = []
    queue: list[str] = [tid for tid in initial_ids if tid not in visited]

    while queue:
        batch: list[str] = list(dict.fromkeys(queue))
        queue: list[str] = []

        results: list[Task | None] = await asyncio.gather(
            *(task_store.get(tid, context=context) for tid in batch),
        )

        for item in results:
            if not isinstance(item, Task):
                continue

            if item.id in visited:
                continue

            visited.add(item.id)
            tasks.append(item)

            for msg in item.history or ():
                queue.extend(msg.reference_task_ids or ())

        queue: list[str] = [tid for tid in queue if tid not in visited]

    return tasks[::-1]  # reverse to maintain original order