47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 | class LiteLLMChat(ChatLLM):
"""LiteLLM-based chat model. Supports multiple endpoints via LiteLLM."""
def __init__(self, model: str) -> None:
self.model: str = model
self.default_params: dict = {}
@staticmethod
def _to_litellm_messages(messages: list[Message]) -> list[dict[str, str]]:
"""Convert Message objects into LiteLLM-compatible dicts."""
return [m.model_dump(exclude={"kind"}) for m in messages]
def _build_completion_kwargs(
self,
messages: list[Message],
*,
tools: list | None = None,
response_format: dict | type[BaseModel] | None = None,
stream: bool = False,
) -> dict:
"""Build kwargs for LiteLLM completion/acompletion calls."""
kwargs: dict = {
"model": self.model,
"messages": self._to_litellm_messages(messages),
"api_base": os.getenv("LITELLM_API_BASE"),
"api_key": os.getenv("LITELLM_API_KEY"),
**self.default_params,
}
kwargs["stream"] = stream
if tools:
kwargs["tools"] = tools
if response_format is not None:
kwargs["response_format"] = response_format
return kwargs
def invoke(
self,
messages: list[Message],
*,
ctx: InvocationContext | None = None,
tools: list | None = None,
response_format: dict | type[BaseModel] | None = None,
) -> ChatLLMResponse:
kwargs = self._build_completion_kwargs(
messages,
tools=tools,
response_format=response_format,
)
return to_chatllm_response(completion(**kwargs))
async def ainvoke(
self,
messages: list[Message],
*,
ctx: InvocationContext | None = None,
tools: list | None = None,
response_format: dict | type[BaseModel] | None = None,
) -> ChatLLMResponse:
kwargs = self._build_completion_kwargs(
messages,
tools=tools,
response_format=response_format,
)
return to_chatllm_response(await acompletion(**kwargs))
def stream(
self,
messages: list[Message],
*,
ctx: InvocationContext | None = None,
tools: list | None = None,
response_format: dict | type[BaseModel] | None = None,
) -> AsyncIterator[ChatLLMStreamChunk]:
async def gen() -> AsyncIterator[ChatLLMStreamChunk]:
kwargs = self._build_completion_kwargs(
messages,
tools=tools,
response_format=response_format,
stream=True,
)
stream = await acompletion(**kwargs)
async for chunk in stream:
delta = chunk.choices[0].delta.content or ""
if not delta:
continue
yield ChatLLMStreamChunk(
delta=MessageChunk(role="assistant", delta=delta)
)
return gen()
|