A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://docs.litellm.ai/docs/routing below:

Router - Load Balancing | liteLLM

Router - Load Balancing

LiteLLM manages:

In production, litellm supports using Redis as a way to track cooldown server and usage (managing tpm/rpm limits).

Load Balancing

(s/o @paulpierre and sweep proxy for their contributions to this implementation) See Code

Quick Start

Loadbalance across multiple azure/bedrock/provider deployments. LiteLLM will handle retrying in different regions if a call fails.

from litellm import Router

model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}, {
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION"),
}
}, {
"model_name": "gpt-4",
"litellm_params": {
"model": "gpt-4",
"api_key": os.getenv("OPENAI_API_KEY"),
}
},

]

router = Router(model_list=model_list)



response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)



response = await router.acompletion(model="gpt-4",
messages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)

info

See detailed proxy loadbalancing/fallback docs here

  1. Setup model_list with multiple deployments
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/<your-deployment-name>
api_base: <your-azure-endpoint>
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-small-ca
api_base: https://my-endpoint-canada-berri992.openai.azure.com/
api_key: <your-azure-api-key>
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/gpt-turbo-large
api_base: https://openai-france-1234.openai.azure.com/
api_key: <your-azure-api-key>
  1. Start proxy
litellm --config /path/to/config.yaml 
  1. Test it!
curl -X POST 'http://0.0.0.0:4000/chat/completions' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer sk-1234' \
-d '{
"model": "gpt-3.5-turbo",
"messages": [
{"role": "user", "content": "Hi there!"}
],
"mock_testing_rate_limit_error": true
}'
Available Endpoints Advanced - Routing Strategies ⭐️ Routing Strategies - Weighted Pick, Rate Limit Aware, Least Busy, Latency Based, Cost Based

Router provides 4 strategies for routing your calls across multiple deployments:

🎉 NEW This is an async implementation of usage-based-routing.

Filters out deployment if tpm/rpm limit exceeded - If you pass in the deployment's tpm/rpm limits.

Routes to deployment with lowest TPM usage for that minute.

In production, we use Redis to track usage (TPM/RPM) across multiple deployments. This implementation uses async redis calls (redis.incr and redis.mget).

For Azure, you get 6 RPM per 1000 TPM

from litellm import Router 


model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 10000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
"tpm": 100000,
"rpm": 1000,
},
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
"tpm": 100000,
"rpm": 1000,
},
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing-v2"
enable_pre_call_checks=True,
)

response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]

print(response)

1. Set strategy in config

model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
tpm: 100000
rpm: 10000
- model_name: gpt-3.5-turbo
litellm_params:
model: gpt-3.5-turbo
api_key: os.getenv(OPENAI_API_KEY)
tpm: 100000
rpm: 1000

router_settings:
routing_strategy: usage-based-routing-v2
redis_host: <your-redis-host>
redis_password: <your-redis-password>
redis_port: <your-redis-port>
enable_pre_call_check: true

general_settings:
master_key: sk-1234

2. Start proxy

litellm --config /path/to/config.yaml

3. Test it!

curl --location 'http://localhost:4000/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-1234' \
--data '{
"model": "gpt-3.5-turbo",
"messages": [{"role": "user", "content": "Hey, how's it going?"}]
}'

Picks the deployment with the lowest response time.

It caches, and updates the response times for deployments based on when a request was sent and received from a deployment.

How to test

from litellm import Router 
import asyncio

model_list = [{ ... }]


router = Router(model_list=model_list,
routing_strategy="latency-based-routing",
enable_pre_call_check=True,
)


tasks = []
response = None
final_response = None
for _ in range(2):
tasks.append(router.acompletion(model=model, messages=messages))
response = await asyncio.gather(*tasks)

if response is not None:

await asyncio.sleep(1)
picked_deployment = router.lowestlatency_logger.get_available_deployments(
model_group=model, healthy_deployments=router.healthy_deployments
)
final_response = await router.acompletion(model=model, messages=messages)
print(f"min deployment id: {picked_deployment}")
print(f"model id: {final_response._hidden_params['model_id']}")
assert (
final_response._hidden_params["model_id"]
== picked_deployment["model_info"]["id"]
)
Set Time Window

Set time window for how far back to consider when averaging latency for a deployment.

In Router

router = Router(..., routing_strategy_args={"ttl": 10})

In Proxy

router_settings:
routing_strategy_args: {"ttl": 10}
Set Lowest Latency Buffer

Set a buffer within which deployments are candidates for making calls to.

E.g.

if you have 5 deployments

https://litellm-prod-1.openai.azure.com/: 0.07s
https://litellm-prod-2.openai.azure.com/: 0.1s
https://litellm-prod-3.openai.azure.com/: 0.1s
https://litellm-prod-4.openai.azure.com/: 0.1s
https://litellm-prod-5.openai.azure.com/: 4.66s

to prevent initially overloading prod-1, with all requests - we can set a buffer of 50%, to consider deployments prod-2, prod-3, prod-4.

In Router

router = Router(..., routing_strategy_args={"lowest_latency_buffer": 0.5})

In Proxy

router_settings:
routing_strategy_args: {"lowest_latency_buffer": 0.5}

Default Picks a deployment based on the provided Requests per minute (rpm) or Tokens per minute (tpm)

If rpm or tpm is not provided, it randomly picks a deployment

You can also set a weight param, to specify which model should get picked when.

LiteLLM Proxy Config.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 900
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
rpm: 10
Python SDK
from litellm import Router 
import asyncio

model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 900,
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"rpm": 10,
}
},]


router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response

asyncio.run(router_acompletion())
LiteLLM Proxy Config.yaml
model_list:
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-v-2
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 9
- model_name: gpt-3.5-turbo
litellm_params:
model: azure/chatgpt-functioncalling
api_key: os.environ/AZURE_API_KEY
api_version: os.environ/AZURE_API_VERSION
api_base: os.environ/AZURE_API_BASE
weight: 1
Python SDK
from litellm import Router 
import asyncio

model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 9,
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"weight": 1,
}
}]


router = Router(model_list=model_list, routing_strategy="simple-shuffle")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response

asyncio.run(router_acompletion())

This will route to the deployment with the lowest TPM usage for that minute.

In production, we use Redis to track usage (TPM/RPM) across multiple deployments.

If you pass in the deployment's tpm/rpm limits, this will also check against that, and filter out any who's limits would be exceeded.

For Azure, your RPM = TPM/6.

from litellm import Router 


model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 10000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"tpm": 100000,
"rpm": 1000,
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"tpm": 100000,
"rpm": 1000,
}]
router = Router(model_list=model_list,
redis_host=os.environ["REDIS_HOST"],
redis_password=os.environ["REDIS_PASSWORD"],
redis_port=os.environ["REDIS_PORT"],
routing_strategy="usage-based-routing"
enable_pre_call_check=True,
)

response = await router.acompletion(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]

print(response)

Picks a deployment with the least number of ongoing calls, it's handling.

How to test

from litellm import Router 
import asyncio

model_list = [{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
}
}, {
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": os.getenv("OPENAI_API_KEY"),
}
}]


router = Router(model_list=model_list, routing_strategy="least-busy")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
return response

asyncio.run(router_acompletion())

Plugin a custom routing strategy to select deployments

Step 1. Define your custom routing strategy


from litellm.router import CustomRoutingStrategyBase
class CustomRoutingStrategy(CustomRoutingStrategyBase):
async def async_get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Asynchronously retrieves the available deployment based on the given parameters.

Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.

Returns:
Returns an element from litellm.router.model_list

"""
print("In CUSTOM async get available deployment")
model_list = router.model_list
print("router model list=", model_list)
for model in model_list:
if isinstance(model, dict):
if model["litellm_params"]["model"] == "openai/very-special-endpoint":
return model
pass

def get_available_deployment(
self,
model: str,
messages: Optional[List[Dict[str, str]]] = None,
input: Optional[Union[str, List]] = None,
specific_deployment: Optional[bool] = False,
request_kwargs: Optional[Dict] = None,
):
"""
Synchronously retrieves the available deployment based on the given parameters.

Args:
model (str): The name of the model.
messages (Optional[List[Dict[str, str]]], optional): The list of messages for a given request. Defaults to None.
input (Optional[Union[str, List]], optional): The input for a given embedding request. Defaults to None.
specific_deployment (Optional[bool], optional): Whether to retrieve a specific deployment. Defaults to False.
request_kwargs (Optional[Dict], optional): Additional request keyword arguments. Defaults to None.

Returns:
Returns an element from litellm.router.model_list

"""
pass

Step 2. Initialize Router with custom routing strategy

from litellm import Router

router = Router(
model_list=[
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/very-special-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "very-special-endpoint"},
},
{
"model_name": "azure-model",
"litellm_params": {
"model": "openai/fast-endpoint",
"api_base": "https://exampleopenaiendpoint-production.up.railway.app/",
"api_key": "fake-key",
},
"model_info": {"id": "fast-endpoint"},
},
],
set_verbose=True,
debug_level="DEBUG",
timeout=1,
)

router.set_custom_routing_strategy(CustomRoutingStrategy())

Step 3. Test your routing strategy. Expect your custom routing strategy to be called when running router.acompletion requests

for _ in range(10):
response = await router.acompletion(
model="azure-model", messages=[{"role": "user", "content": "hello"}]
)
print(response)
_picked_model_id = response._hidden_params["model_id"]
print("picked model=", _picked_model_id)

Picks a deployment based on the lowest cost

How this works:

from litellm import Router 
import asyncio

model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "gpt-4"},
"model_info": {"id": "openai-gpt-4"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {"model": "groq/llama3-8b-8192"},
"model_info": {"id": "groq-llama"},
},
]


router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)

print(response._hidden_params["model_id"])
return response

asyncio.run(router_acompletion())

Using Custom Input/Output pricing

Set litellm_params["input_cost_per_token"] and litellm_params["output_cost_per_token"] for using custom pricing when routing

model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"input_cost_per_token": 0.00003,
"output_cost_per_token": 0.00003,
},
"model_info": {"id": "chatgpt-v-experimental"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-1",
"input_cost_per_token": 0.000000001,
"output_cost_per_token": 0.00000001,
},
"model_info": {"id": "chatgpt-v-1"},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-5",
"input_cost_per_token": 10,
"output_cost_per_token": 12,
},
"model_info": {"id": "chatgpt-v-5"},
},
]

router = Router(model_list=model_list, routing_strategy="cost-based-routing")
async def router_acompletion():
response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)

print(response._hidden_params["model_id"])
return response

asyncio.run(router_acompletion())
Basic Reliability Weighted Deployments

Set weight on a deployment to pick one deployment more often than others.

This works across simple-shuffle routing strategy (this is the default, if no routing strategy is selected).

from litellm import Router 

model_list = [
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 1
},
},
{
"model_name": "o1",
"litellm_params": {
"model": "o1-preview",
"api_key": os.getenv("OPENAI_API_KEY"),
"weight": 2
},
},
]

router = Router(model_list=model_list, routing_strategy="cost-based-routing")

response = await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}]
)
print(response)
model_list:
- model_name: o1
litellm_params:
model: o1
api_key: os.environ/OPENAI_API_KEY
weight: 1
- model_name: o1
litellm_params:
model: o1-preview
api_key: os.environ/OPENAI_API_KEY
weight: 2
Max Parallel Requests (ASYNC)

Used in semaphore for async requests on router. Limit the max concurrent calls made to a deployment. Useful in high-traffic scenarios.

If tpm/rpm is set, and no max parallel request limit given, we use the RPM or calculated RPM (tpm/1000/6) as the max parallel request limit.

from litellm import Router 

model_list = [{
"model_name": "gpt-4",
"litellm_params": {
"model": "azure/gpt-4",
...
"max_parallel_requests": 10
}
}]



router = Router(model_list=model_list, default_max_parallel_requests=20)



See Code

Cooldowns

Set the limit for how many calls a model is allowed to fail in a minute, before being cooled down for a minute.

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
allowed_fails=1,
cooldown_time=100
)

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]


response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

Set Global Value

router_settings:
allowed_fails: 3
cooldown_time: 30

Defaults:

Set Per Model

model_list:
- model_name: fake-openai-endpoint
litellm_params:
model: predibase/llama-3-8b-instruct
api_key: os.environ/PREDIBASE_API_KEY
tenant_id: os.environ/PREDIBASE_TENANT_ID
max_new_tokens: 256
cooldown_time: 0

Expected Response

No deployments available for selected model, Try again in 60 seconds. Passed model=claude-3-5-sonnet. pre-call-checks=False, allowed_model_region=n/a.
Disable cooldowns
from litellm import Router 


router = Router(..., disable_cooldowns=True)
router_settings:
disable_cooldowns: True
How Cooldowns Work

Cooldowns apply to individual deployments, not entire model groups. The router isolates failures to specific deployments while keeping healthy alternatives available.

What is a deployment?

A deployment is a single entry in your config.yaml model list. Each deployment represents a unique configuration with its own litellm_params.

LiteLLM generates a unique model_id for each deployment by creating a deterministic hash of all the litellm_params. This allows the router to track and manage each deployment independently.

Example: Multiple deployments for the same model

Load Balancing config.yaml

model_list:
- model_name: sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <our-real-key>

- model_name: byok-sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <customer-managed-key>
api_base: https://proxy.litellm.ai/api.anthropic.com

- model_name: sonnet-4
litellm_params:
model: vertex_ai/claude-sonnet-4-20250514
vertex_project: my-project

Each deployment gets a unique model_id (e.g., 1234567890, 9129922, 4982929292) that the router uses for tracking health and cooldown status.

When are deployments cooled down?

The router automatically cools down deployments based on the following conditions:

Condition Trigger Cooldown Duration Rate Limiting (429) Immediate on 429 response 5 seconds (default) High Failure Rate >50% failures in current minute 5 seconds (default) Non-Retryable Errors 401 (Auth), 404 (Not Found), 408 (Timeout) 5 seconds (default)

During cooldown, the specific deployment is temporarily removed from the available pool, while other healthy deployments continue serving requests.

Cooldown Recovery

Deployments automatically recover from cooldown after the cooldown period expires. The router will:

  1. Monitor cooldown timers for each deployment
  2. Automatically re-enable deployments when cooldown expires
  3. Gradually reintroduce cooled-down deployments to the rotation
  4. Reset failure counters once the deployment is healthy again
Real-World Example

Consider this high-availability setup with multiple providers:

Load Balancing config.yaml

model_list:
- model_name: sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <anthropic-key>

- model_name: byok-sonnet-4
litellm_params:
model: anthropic/claude-sonnet-4-20250514
api_key: <customer-managed-key>
api_base: https://proxy.litellm.ai/api.anthropic.com

- model_name: sonnet-4
litellm_params:
model: vertex_ai/claude-sonnet-4-20250514
vertex_project: my-project

Failure Scenario:

Retries

For both async + sync functions, we support retrying failed requests.

For RateLimitError we implement exponential backoffs

For generic errors, we retry immediately

Here's a quick look at how we can set num_retries = 3:

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3)

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]


response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")

We also support setting minimum time to wait before retrying a failed request. This is via the retry_after param.

from litellm import Router

model_list = [{...}]

router = Router(model_list=model_list,
num_retries=3, retry_after=5)

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]


response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")
[Advanced]: Custom Retries, Cooldowns based on Error Type

See All Exception Types

Example:

retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3,
AuthenticationErrorRetries=0,
)

allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000,
RateLimitErrorAllowedFails=100,
)

Example Usage

from litellm.router import RetryPolicy, AllowedFailsPolicy

retry_policy = RetryPolicy(
ContentPolicyViolationErrorRetries=3,
AuthenticationErrorRetries=0,
BadRequestErrorRetries=1,
TimeoutErrorRetries=2,
RateLimitErrorRetries=3,
)

allowed_fails_policy = AllowedFailsPolicy(
ContentPolicyViolationErrorAllowedFails=1000,
RateLimitErrorAllowedFails=100,
)

router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
{
"model_name": "bad-model",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": "bad-key",
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
},
},
],
retry_policy=retry_policy,
allowed_fails_policy=allowed_fails_policy,
)

response = await router.acompletion(
model=model,
messages=messages,
)
router_settings: 
retry_policy: {
"BadRequestErrorRetries": 3,
"ContentPolicyViolationErrorRetries": 4
}
allowed_fails_policy: {
"ContentPolicyViolationErrorAllowedFails": 1000,
"RateLimitErrorAllowedFails": 100
}
Caching

In production, we recommend using a Redis cache. For quickly testing things locally, we also support simple in-memory caching.

In-memory Cache

router = Router(model_list=model_list, 
cache_responses=True)

print(response)

Redis Cache

router = Router(model_list=model_list, 
redis_host=os.getenv("REDIS_HOST"),
redis_password=os.getenv("REDIS_PASSWORD"),
redis_port=os.getenv("REDIS_PORT"),
cache_responses=True)

print(response)

Pass in Redis URL, additional kwargs

router = Router(model_list: Optional[list] = None,

redis_url=os.getenv("REDIS_URL")",
cache_kwargs= {},
cache_responses=True)
Pre-Call Checks (Context Window, EU-Regions)

Enable pre-call checks to filter out:

  1. deployments with context window limit < messages for a call.
  2. deployments outside of eu-region

1. Enable pre-call checks

from litellm import Router 

router = Router(model_list=model_list, enable_pre_call_checks=True)

2. Set Model List

For context window checks on azure deployments, set the base model. Pick the base model from this list, all the azure models start with azure/.

For 'eu-region' filtering, Set 'region_name' of deployment.

Note: We automatically infer region_name for Vertex AI, Bedrock, and IBM WatsonxAI based on your litellm params. For Azure, set litellm.enable_preview = True.

See Code

model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu"
"base_model": "azure/gpt-35-turbo",
},
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "gemini-pro",
"litellm_params: {
"model": "vertex_ai/gemini-pro-1.5",
"vertex_project": "adroit-crow-1234",
"vertex_location": "us-east1"
}
}
]

router = Router(model_list=model_list, enable_pre_call_checks=True)

3. Test it!

"""
- Give a gpt-3.5-turbo model group with different context windows (4k vs. 16k)
- Send a 5k prompt
- Assert it works
"""
from litellm import Router
import os

model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"base_model": "azure/gpt-35-turbo",
},
"model_info": {
"base_model": "azure/gpt-35-turbo",
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
]

router = Router(model_list=model_list, enable_pre_call_checks=True)

text = "What is the meaning of 42?" * 5000

response = router.completion(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": text},
{"role": "user", "content": "Who was Alexander?"},
],
)

print(f"response: {response}")
"""
- Give 2 gpt-3.5-turbo deployments, in eu + non-eu regions
- Make a call
- Assert it picks the eu-region model
"""

from litellm import Router
import os

model_list = [
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE"),
"region_name": "eu"
},
"model_info": {
"id": "1"
}
},
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-1106",
"api_key": os.getenv("OPENAI_API_KEY"),
},
"model_info": {
"id": "2"
}
},
]

router = Router(model_list=model_list, enable_pre_call_checks=True)

response = router.completion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Who was Alexander?"}],
)

print(f"response: {response}")

print(f"response id: {response._hidden_params['model_id']}")

info

Go here for how to do this on the proxy

Caching across model groups

If you want to cache across 2 different model groups (e.g. azure deployments, and openai), use caching groups.

import litellm, asyncio, time
from litellm import Router


os.environ["OPENAI_API_KEY"] = ""
os.environ["AZURE_API_KEY"] = ""
os.environ["AZURE_API_BASE"] = ""
os.environ["AZURE_API_VERSION"] = ""

async def test_acompletion_caching_on_router_caching_groups():

try:
litellm.set_verbose = True
model_list = [
{
"model_name": "openai-gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo-0613",
"api_key": os.getenv("OPENAI_API_KEY"),
},
},
{
"model_name": "azure-gpt-3.5-turbo",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_base": os.getenv("AZURE_API_BASE"),
"api_version": os.getenv("AZURE_API_VERSION")
},
}
]

messages = [
{"role": "user", "content": f"write a one sentence poem {time.time()}?"}
]
start_time = time.time()
router = Router(model_list=model_list,
cache_responses=True,
caching_groups=[("openai-gpt-3.5-turbo", "azure-gpt-3.5-turbo")])
response1 = await router.acompletion(model="openai-gpt-3.5-turbo", messages=messages, temperature=1)
print(f"response1: {response1}")
await asyncio.sleep(1)
response2 = await router.acompletion(model="azure-gpt-3.5-turbo", messages=messages, temperature=1)
assert response1.id == response2.id
assert len(response1.choices[0].message.content) > 0
assert response1.choices[0].message.content == response2.choices[0].message.content
except Exception as e:
traceback.print_exc()

asyncio.run(test_acompletion_caching_on_router_caching_groups())
Alerting 🚨

Send alerts to slack / your webhook url for the following events

Get a slack webhook url from https://api.slack.com/messaging/webhooks

Usage

Initialize an AlertingConfig and pass it to litellm.Router. The following code will trigger an alert because api_key=bad-key which is invalid

from litellm.router import AlertingConfig
import litellm
import os

router = litellm.Router(
model_list=[
{
"model_name": "gpt-3.5-turbo",
"litellm_params": {
"model": "gpt-3.5-turbo",
"api_key": "bad_key",
},
}
],
alerting_config= AlertingConfig(
alerting_threshold=10,
webhook_url= os.getenv("SLACK_WEBHOOK_URL")
),
)
try:
await router.acompletion(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "Hey, how's it going?"}],
)
except:
pass
Track cost for Azure Deployments

Problem: Azure returns gpt-4 in the response when azure/gpt-4-1106-preview is used. This leads to inaccurate cost tracking

Solution ✅ : Set model_info["base_model"] on your router init so litellm uses the correct model for calculating azure cost

Step 1. Router Setup

from litellm import Router

model_list = [
{
"model_name": "gpt-4-preview",
"litellm_params": {
"model": "azure/chatgpt-v-2",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-1106-preview"
}
},
{
"model_name": "gpt-4-32k",
"litellm_params": {
"model": "azure/chatgpt-functioncalling",
"api_key": os.getenv("AZURE_API_KEY"),
"api_version": os.getenv("AZURE_API_VERSION"),
"api_base": os.getenv("AZURE_API_BASE")
},
"model_info": {
"base_model": "azure/gpt-4-32k"
}
}
]

router = Router(model_list=model_list)

Step 2. Access response_cost in the custom callback, litellm calculates the response cost for you

import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
response_cost = kwargs.get("response_cost")
print("response_cost=", response_cost)

customHandler = MyCustomHandler()
litellm.callbacks = [customHandler]


response = router.completion(
model="gpt-4-32k",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
Default litellm.completion/embedding params

You can also set default params for litellm completion/embedding calls. Here's how to do that:

from litellm import Router

fallback_dict = {"gpt-3.5-turbo": "gpt-3.5-turbo-16k"}

router = Router(model_list=model_list,
default_litellm_params={"context_window_fallback_dict": fallback_dict})

user_message = "Hello, whats the weather in San Francisco??"
messages = [{"content": user_message, "role": "user"}]


response = router.completion(model="gpt-3.5-turbo", messages=messages)

print(f"response: {response}")
Custom Callbacks - Track API Key, API Endpoint, Model Used

If you need to track the api_key, api endpoint, model, custom_llm_provider used for each completion call, you can setup a custom callback

Usage
import litellm
from litellm.integrations.custom_logger import CustomLogger

class MyCustomHandler(CustomLogger):
def log_success_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Success")
print("kwargs=", kwargs)
litellm_params= kwargs.get("litellm_params")
api_key = litellm_params.get("api_key")
api_base = litellm_params.get("api_base")
custom_llm_provider= litellm_params.get("custom_llm_provider")
response_cost = kwargs.get("response_cost")


print("api_key=", api_key)
print("api_base=", api_base)
print("custom_llm_provider=", custom_llm_provider)
print("response_cost=", response_cost)

def log_failure_event(self, kwargs, response_obj, start_time, end_time):
print(f"On Failure")
print("kwargs=")

customHandler = MyCustomHandler()

litellm.callbacks = [customHandler]


router = Router(model_list=model_list, routing_strategy="simple-shuffle")


response = router.completion(
model="gpt-3.5-turbo",
messages=[{ "role": "user", "content": "Hi who are you"}]
)
Deploy Router

If you want a server to load balance across different LLM APIs, use our LiteLLM Proxy Server

Debugging Router Basic Debugging

Set Router(set_verbose=True)

from litellm import Router

router = Router(
model_list=model_list,
set_verbose=True
)
Detailed Debugging

Set Router(set_verbose=True,debug_level="DEBUG")

from litellm import Router

router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG"
)
Very Detailed Debugging

Set litellm.set_verbose=True and Router(set_verbose=True,debug_level="DEBUG")

from litellm import Router
import litellm

litellm.set_verbose = True

router = Router(
model_list=model_list,
set_verbose=True,
debug_level="DEBUG"
)
Router General Settings Usage
router = Router(model_list=..., router_general_settings=RouterGeneralSettings(async_only_mode=True))
Spec
class RouterGeneralSettings(BaseModel):
async_only_mode: bool = Field(
default=False
)
pass_through_all_models: bool = Field(
default=False
)

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4