LiteLLM manages:
In production, litellm supports using Redis as a way to track cooldown server and usage (managing tpm/rpm limits).
Load Balancing(s/o @paulpierre and sweep proxy for their contributions to this implementation) See Code
Quick StartLoadbalance across multiple azure/bedrock/provider deployments. LiteLLM will handle retrying in different regions if a call fails.
from litellm import Router
model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}, {
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
}, {
"model_name": "gpt-4",
"litellm_params": {
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},
]
router = Router(model_list=model_list)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
response = await router.acompletion(model="gpt-4",
messages=[{"role": "user", "content": "Hey, how's it going?"}])
print(response)
info
See detailed proxy loadbalancing/fallback docs here
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/<your-deployment-name>
api_base: <your-azure-endpoint>
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-small-ca
api_base: https://my-endpoint-canada-berri992.openai.azure.com/
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-large
api_base: https://openai-france-1234.openai.azure.com/
api_key: <your-azure-api-key>
litellm --config /path/to/config.yaml
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hi there!"}
],
"mock_testing_rate_limit_error": true
}'
Available Endpoints
router.completion()
- chat completions endpoint to call 100+ LLMsrouter.acompletion()
- async chat completion callsrouter.embedding()
- embedding endpoint for Azure, OpenAI, Huggingface endpointsrouter.aembedding()
- async embeddings callsrouter.text_completion()
- completion calls in the old OpenAI /v1/completions
endpoint formatrouter.atext_completion()
- async text completion callsrouter.image_generation()
- completion calls in OpenAI /v1/images/generations
endpoint formatrouter.aimage_generation()
- async image generation callsRouter provides 4 strategies for routing your calls across multiple deployments:
🎉 NEW This is an async implementation of usage-based-routing.
Filters out deployment if tpm/rpm limit exceeded - If you pass in the deployment's tpm/rpm limits.
Routes to deployment with lowest TPM usage for that minute.
In production, we use Redis to track usage (TPM/RPM) across multiple deployments. This implementation uses async redis calls (redis.incr and redis.mget).
For Azure, you get 6 RPM per 1000 TPM
from litellm import Router
model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 10000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 1000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm": 100000,
"rpm": 1000,
},
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing-v2"
enable_pre_call_checks=True,
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
1. Set strategy in config
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
tpm: 100000
rpm: 10000
- model_name: gpt-3.5-turbo
litellm_params:
model: gpt-3.5-turbo
api_key: os.getenv(OPENAI_API_KEY)
tpm: 100000
rpm: 1000
router_settings:
routing_strategy: usage-based-routing-v2
redis_host: <your-redis-host>
redis_password: <your-redis-password>
redis_port: <your-redis-port>
enable_pre_call_check: true
general_settings:
master_key: sk-1234
2. Start proxy
litellm --config /path/to/config.yaml
3. Test it!
curl --location 'http://localhost:4000/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hey, how's it going?"}]
}'
Picks the deployment with the lowest response time.
It caches, and updates the response times for deployments based on when a request was sent and received from a deployment.
from litellm import Router
import asyncio
model_list = [{ ... }]
router = Router(model_list=model_list,
routing_strategy="latency-based-routing",
enable_pre_call_check=True,
)
tasks = []
response = None
final_response = None
for _ in range(2):
tasks.append(router.acompletion(model=model, messages=messages))
response = await asyncio.gather(*tasks)
if response is not None:
await asyncio.sleep(1)
picked_deployment = router.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=router.healthy_deployments
)
final_response = await router.acompletion(model=model, messages=messages)
print(f"min deployment id: {picked_deployment}")
print(f"model id: {final_response._hidden_params['model_id']}")
assert (
final_response._hidden_params["model_id"]
== picked_deployment["model_info"]["id"]
)
Set Time Window
Set time window for how far back to consider when averaging latency for a deployment.
In Router
router = Router(..., routing_strategy_args={"ttl": 10})
In Proxy
router_settings:
routing_strategy_args: {"ttl": 10}
Set Lowest Latency Buffer
Set a buffer within which deployments are candidates for making calls to.
E.g.
if you have 5 deployments
https://litellm-prod-1.openai.azure.com/: 0.07s
https://litellm-prod-2.openai.azure.com/: 0.1s
https://litellm-prod-3.openai.azure.com/: 0.1s
https://litellm-prod-4.openai.azure.com/: 0.1s
https://litellm-prod-5.openai.azure.com/: 4.66s
to prevent initially overloading prod-1
, with all requests - we can set a buffer of 50%, to consider deployments prod-2, prod-3, prod-4
.
In Router
router = Router(..., routing_strategy_args={"lowest_latency_buffer": 0.5})
In Proxy
router_settings:
routing_strategy_args: {"lowest_latency_buffer": 0.5}
Default Picks a deployment based on the provided Requests per minute (rpm) or Tokens per minute (tpm)
If rpm
or tpm
is not provided, it randomly picks a deployment
You can also set a weight
param, to specify which model should get picked when.
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 900
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 10
Python SDK
from litellm import Router
import asyncio
model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 900,
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 10,
}
},]
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
LiteLLM Proxy Config.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 9
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 1
Python SDK
from litellm import Router
import asyncio
model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 9,
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 1,
}
}]
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
This will route to the deployment with the lowest TPM usage for that minute.
In production, we use Redis to track usage (TPM/RPM) across multiple deployments.
If you pass in the deployment's tpm/rpm limits, this will also check against that, and filter out any who's limits would be exceeded.
For Azure, your RPM = TPM/6.
from litellm import Router
model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 10000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 1000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 1000,
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing"
enable_pre_call_check=True,
)
response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
print(response)
Picks a deployment with the least number of ongoing calls, it's handling.
from litellm import Router
import asyncio
model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]
router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response
asyncio.run(router_acompletion())
Plugin a custom routing strategy to select deployments
Step 1. Define your custom routing strategy
from litellm.router import CustomRoutingStrategyBase
class CustomRoutingStrategy(CustomRoutingStrategyBase):
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
print("In CUSTOM async get available deployment")
model_list = router.model_list
print("router model list=", model_list)
for model in model_list:
if isinstance(model, dict):
if model["litellm_params"]["model"] == "openai/very-special-endpoint":
return model
pass
def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.
Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.
Returns:
Returns an element from litellm.router.model_list
"""
pass
Step 2. Initialize Router with custom routing strategy
from litellm import Router
router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/very-special-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "very-special-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
timeout=1,
)
router.set_custom_routing_strategy(CustomRoutingStrategy())
Step 3. Test your routing strategy. Expect your custom routing strategy to be called when running router.acompletion
requests
for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
print("picked model=", _picked_model_id)
Picks a deployment based on the lowest cost
How this works:
rpm/tpm
limitslitellm_param["model"]
exists in litellm_model_cost_map
litellm_model_cost_map
-> use deployment_cost= $1
from litellm import Router
import asyncio
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-4"},
"model_info": {"id": "openai-gpt-4"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "groq/llama3-8b-8192"},
"model_info": {"id": "groq-llama"},
},
]
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"])
return response
asyncio.run(router_acompletion())
Using Custom Input/Output pricing
Set litellm_params["input_cost_per_token"]
and litellm_params["output_cost_per_token"]
for using custom pricing when routing
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"input_cost_per_token": 0.00003,
"output_cost_per_token": 0.00003,
},
"model_info": {"id": "chatgpt-v-experimental"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-1",
"input_cost_per_token": 0.000000001,
"output_cost_per_token": 0.00000001,
},
"model_info": {"id": "chatgpt-v-1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-5",
"input_cost_per_token": 10,
"output_cost_per_token": 12,
},
"model_info": {"id": "chatgpt-v-5"},
},
]
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
print(response._hidden_params["model_id"])
return response
asyncio.run(router_acompletion())
Basic Reliability Weighted Deployments
Set weight
on a deployment to pick one deployment more often than others.
This works across simple-shuffle routing strategy (this is the default, if no routing strategy is selected).
from litellm import Router
model_list = [
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 1
},
},
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 2
},
},
]
router = Router(model_list=model_list, routing_strategy="cost-based-routing")
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
model_list:
- model_name: o1
litellm_params:
model: o1
api_key: os.environ/OPENAI_API_KEY
weight: 1
- model_name: o1
litellm_params:
model: o1-preview
api_key: os.environ/OPENAI_API_KEY
weight: 2
Max Parallel Requests (ASYNC)
Used in semaphore for async requests on router. Limit the max concurrent calls made to a deployment. Useful in high-traffic scenarios.
If tpm/rpm is set, and no max parallel request limit given, we use the RPM or calculated RPM (tpm/1000/6) as the max parallel request limit.
from litellm import Router
model_list = [{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
...
"max_parallel_requests": 10
}
}]
router = Router(model_list=model_list, default_max_parallel_requests=20)
Cooldowns
Set the limit for how many calls a model is allowed to fail in a minute, before being cooled down for a minute.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
allowed_fails=1,
cooldown_time=100
)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Set Global Value
router_settings:
allowed_fails: 3
cooldown_time: 30
Defaults:
DEFAULT_COOLDOWN_TIME_SECONDS
in constants.py)Set Per Model
model_list:
- model_name: fake-openai-endpoint
litellm_params:
model: predibase/llama-3-8b-instruct
api_key: os.environ/PREDIBASE_API_KEY
tenant_id: os.environ/PREDIBASE_TENANT_ID
max_new_tokens: 256
cooldown_time: 0
Expected Response
No deployments available for selected model, Try again in 60 seconds. Passed model=claude-3-5-sonnet. pre-call-checks=False, allowed_model_region=n/a.
Disable cooldowns
from litellm import Router
router = Router(..., disable_cooldowns=True)
router_settings:
disable_cooldowns: True
How Cooldowns Work
Cooldowns apply to individual deployments, not entire model groups. The router isolates failures to specific deployments while keeping healthy alternatives available.
What is a deployment?A deployment is a single entry in your config.yaml
model list. Each deployment represents a unique configuration with its own litellm_params
.
LiteLLM generates a unique model_id
for each deployment by creating a deterministic hash of all the litellm_params
. This allows the router to track and manage each deployment independently.
Example: Multiple deployments for the same model
Load Balancing config.yaml
model_list:
- model_name: sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <our-real-key>
- model_name: byok-sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <customer-managed-key>
api_base: https://proxy.litellm.ai/api.anthropic.com
- model_name: sonnet-4
litellm_params:
model: vertex_ai/claude-sonnet-4-20250514
vertex_project: my-project
Each deployment gets a unique model_id
(e.g., 1234567890
, 9129922
, 4982929292
) that the router uses for tracking health and cooldown status.
The router automatically cools down deployments based on the following conditions:
Condition Trigger Cooldown Duration Rate Limiting (429) Immediate on 429 response 5 seconds (default) High Failure Rate >50% failures in current minute 5 seconds (default) Non-Retryable Errors 401 (Auth), 404 (Not Found), 408 (Timeout) 5 seconds (default)During cooldown, the specific deployment is temporarily removed from the available pool, while other healthy deployments continue serving requests.
Cooldown RecoveryDeployments automatically recover from cooldown after the cooldown period expires. The router will:
Consider this high-availability setup with multiple providers:
Load Balancing config.yaml
model_list:
- model_name: sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <anthropic-key>
- model_name: byok-sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <customer-managed-key>
api_base: https://proxy.litellm.ai/api.anthropic.com
- model_name: sonnet-4
litellm_params:
model: vertex_ai/claude-sonnet-4-20250514
vertex_project: my-project
Failure Scenario:
RetriesFor both async + sync functions, we support retrying failed requests.
For RateLimitError we implement exponential backoffs
For generic errors, we retry immediately
Here's a quick look at how we can set num_retries = 3
:
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
We also support setting minimum time to wait before retrying a failed request. This is via the retry_after
param.
from litellm import Router
model_list = [{...}]
router = Router(model_list=model_list,
num_retries=3, retry_after=5)
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
[Advanced]: Custom Retries, Cooldowns based on Error Type
RetryPolicy
if you want to set a num_retries
based on the Exception receivedAllowedFailsPolicy
to set a custom number of allowed_fails
/minute before cooling down a deploymentExample:
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3,
AuthenticationErrorRetries=0,
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000,
RateLimitErrorAllowedFails=100,
)
Example Usage
from litellm.router import RetryPolicy, AllowedFailsPolicy
retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3,
AuthenticationErrorRetries=0,
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)
allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000,
RateLimitErrorAllowedFails=100,
)
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name": "bad-model",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)
response = await router.acompletion(
model=model,
messages=messages,
)
router_settings:
retry_policy: {
"BadRequestErrorRetries": 3,
"ContentPolicyViolationErrorRetries": 4
}
allowed_fails_policy: {
"ContentPolicyViolationErrorAllowedFails": 1000,
"RateLimitErrorAllowedFails": 100
}
Caching
In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.
In-memory Cache
router = Router(model_list=model_list,
cache_responses=True)
print(response)
Redis Cache
router = Router(model_list=model_list,
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)
print(response)
Pass in Redis URL, additional kwargs
router = Router(model_list: Optional[list] = None,
redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {},
cache_responses=True)
Pre-Call Checks (Context Window, EU-Regions)
Enable pre-call checks to filter out:
1. Enable pre-call checks
from litellm import Router
router = Router(model_list=model_list, enable_pre_call_checks=True)
2. Set Model List
For context window checks on azure deployments, set the base model. Pick the base model from this list, all the azure models start with azure/
.
For 'eu-region' filtering, Set 'region_name' of deployment.
Note: We automatically infer region_name for Vertex AI, Bedrock, and IBM WatsonxAI based on your litellm params. For Azure, set litellm.enable_preview = True
.
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu"
"base_model": "azure/gpt-35-turbo",
},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gemini-pro",
"litellm_params: {
"model": "vertex_ai/gemini-pro-1.5",
"vertex_project": "adroit-crow-1234",
"vertex_location": "us-east1"
}
}
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
3. Test it!
"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
"model_info": {
"base_model": "azure/gpt-35-turbo",
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
text = "What is the meaning of 42?" * 5000
response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)
print(f"response: {response}")
"""
- Give 2 gpt-3.5-turbo deployments, in eu + non-eu regions
- Make a call
- Assert it picks the eu-region model
"""
from litellm import Router
import os
model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu"
},
"model_info": {
"id": "1"
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"model_info": {
"id": "2"
}
},
]
router = Router(model_list=model_list, enable_pre_call_checks=True)
response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Who was Alexander?"}],
)
print(f"response: {response}")
print(f"response id: {response._hidden_params['model_id']}")
info
Go here for how to do this on the proxy
Caching across model groupsIf you want to cache across 2 different model groups (e.g. azure deployments, and openai), use caching groups.
import litellm, asyncio, time
from litellm import Router
os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""
async def test_acompletion_caching_on_router_caching_groups():
try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]
messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1)
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()
asyncio.run(test_acompletion_caching_on_router_caching_groups())
Alerting 🚨
Send alerts to slack / your webhook url for the following events
Get a slack webhook url from https://api.slack.com/messaging/webhooks
UsageInitialize an AlertingConfig
and pass it to litellm.Router
. The following code will trigger an alert because api_key=bad-key
which is invalid
from litellm.router import AlertingConfig
import litellm
import os
router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10,
webhook_url= os.getenv("SLACK_WEBHOOK_URL")
),
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except:
pass
Track cost for Azure Deployments
Problem: Azure returns gpt-4
in the response when azure/gpt-4-1106-preview
is used. This leads to inaccurate cost tracking
Solution ✅ : Set model_info["base_model"]
on your router init so litellm uses the correct model for calculating azure cost
Step 1. Router Setup
from litellm import Router
model_list = [
{
"model_name": "gpt-4-preview",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-1106-preview"
}
},
{
"model_name": "gpt-4-32k",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-32k"
}
}
]
router = Router(model_list=model_list)
Step 2. Access response_cost
in the custom callback, litellm calculates the response cost for you
import litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
response_cost = kwargs.get("response_cost")
print("response_cost=", response_cost)
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
response = router.completion(
model="gpt-4-32k",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
Default litellm.completion/embedding params
You can also set default params for litellm completion/embedding calls. Here's how to do that:
from litellm import Router
fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}
router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})
user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]
response = router.completion(model="gpt-3.5-turbo", messages=messages)
print(f"response: {response}")
Custom Callbacks - Track API Key, API Endpoint, Model Used
If you need to track the api_key, api endpoint, model, custom_llm_provider used for each completion call, you can setup a custom callback
Usageimport litellm
from litellm.integrations.custom_logger import CustomLogger
class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key = litellm_params.get("api_key")
api_base = litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost = kwargs.get("response_cost")
print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)
def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")
customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]
router = Router(model_list=model_list, routing_strategy="simple-shuffle")
response = router.completion(
model="gpt-3.5-turbo",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
Deploy Router
If you want a server to load balance across different LLM APIs, use our LiteLLM Proxy Server
Debugging Router Basic DebuggingSet Router(set_verbose=True)
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True
)
Detailed Debugging
Set Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG"
)
Very Detailed Debugging
Set litellm.set_verbose=True
and Router(set_verbose=True,debug_level="DEBUG")
from litellm import Router
import litellm
litellm.set_verbose = True
router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG"
)
Router General Settings Usage
router = Router(model_list=..., router_general_settings=RouterGeneralSettings(async_only_mode=True))
Spec
class RouterGeneralSettings(BaseModel):
async_only_mode: bool = Field(
default=False
)
pass_through_all_models: bool = Field(
default=False
)
RetroSearch is an open source project built by @garambo | Open a GitHub Issue
Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo
HTML:
3.2
| Encoding:
UTF-8
| Version:
0.7.4