速率限制

速率限制是我们的 API 对用户或客户端在指定时间内访问服务所施加的限制。

为什么要进行速率限制?

速率限制是 API 的一项常见做法,出于以下几个原因:

  • 它们有助于防止滥用或误用 API。例如,恶意行为者可能会向 API 发送大量请求,试图使其过载或造成服务中断。通过设置速率限制,OpenAI 可以防止此类活动。
  • 速率限制有助于确保所有人都可以公平地访问 API。如果某个用户或组织发送了过多的请求,可能会影响其他所有人的 API 访问速度。通过限制单个用户可以发送的请求数量,OpenAI 可以确保尽可能多的人能够在不受速度影响的情况下使用 API。
  • 速率限制可以帮助 OpenAI 管理其基础设施上的总负载。如果对 API 的请求突然激增,可能会影响服务器性能。通过设置速率限制,OpenAI 可以帮助维护平稳、一致的用户体验。

这些速率限制是如何运作的?

速率限制有五种衡量方式:RPM(每分钟请求数)、RPD(每日请求数)、TPM(每分钟令牌数)、TPD(每日令牌数)和 IPM(每分钟图像数)。无论哪种情况发生得更快,都会达到速率限制。例如,您可能会向 ChatCompletions 端点发送 20 个请求,每个请求包含 100,000 个令牌,这将达到您的限制(如果您的 RPM 为 20),即使您没有在 20 个请求中发送 150,000 个令牌(如果您的 TPM 限制为 150,000)。

批处理 API队列限制是根据提交给特定模型的批处理作业的总输入令牌数来计算的。来自挂起的批处理作业的令牌计入您的队列限制。一旦批处理作业完成,其令牌将不再计入该模型的限制。

其他值得注意的事情:

  • 速率限制是在组织级别和项目级别定义的,而不是用户级别。
  • 速率限制因使用的模型而异。
  • 还对组织每月可以在 API 上花费的金额施加限制。这些也称为“使用限制”。

使用层

您可以在帐户设置的限制部分查看组织的速率和使用限制。随着您对 OpenAI API 的使用以及您在我们的 API 上的支出增加,我们会自动将您升级到下一使用层。这通常会导致大多数模型的速率限制增加。

资格要求使用限制
免费版用户必须位于允许的地理位置每月 100 美元
第 1 层已支付 5 美元每月 100 美元
第 2 层已支付 50 美元,距离首次成功付款 7 天或更长时间每月 500 美元
第 3 层已支付 100 美元,距离首次成功付款 7 天或更长时间每月 1,000 美元
第 4 层已支付 250 美元,距离首次成功付款 14 天或更长时间每月 5,000 美元
第 5 层已支付 1,000 美元,距离首次成功付款 30 天或更长时间每月 15,000 美元

选择以下层之一以查看每个模型的速率限制摘要。

免费层速率限制

这是高级别的摘要,并且存在每个模型的限制(例如,一些旧模型或具有较大上下文窗口的模型具有不同的速率限制)。要查看您帐户的每个模型的确切速率限制,请访问帐户设置中的限制部分。

模型RPMRPDTPM批处理队列限制
gpt-3.5-turbo320040,000200,000
text-embedding-3-large3,0002001,000,0003,000,000
text-embedding-3-small3,0002001,000,0003,000,000
text-embedding-ada-0023,0002001,000,0003,000,000
whisper-13200--
tts-13200--
dall-e-2每分钟 5 张图像---
dall-e-3每分钟 1 张图像---

报头中的速率限制

除了在帐户页面上查看您的速率限制外,您还可以在 HTTP 响应的标头中查看有关您的速率限制的重要信息,例如剩余请求、令牌和其他元数据。

您可以期望看到以下标头字段:

字段示例值描述
x-ratelimit-limit-requests60在用完速率限制之前允许的最大请求数。
x-ratelimit-limit-tokens150,000在用完速率限制之前允许的最大令牌数。
x-ratelimit-remaining-requests59在用完速率限制之前允许的剩余请求数。
x-ratelimit-remaining-tokens149,984在用完速率限制之前允许的剩余令牌数。
x-ratelimit-reset-requests1s速率限制(基于请求)重置为其初始状态所需的时间。
x-ratelimit-reset-tokens6m0s速率限制(基于令牌)重置为其初始状态所需的时间。

错误缓解

OpenAI Cookbook包含一个Python 笔记本电脑,其中解释了如何避免速率限制错误,以及一个示例Python 脚本,用于在批处理 API 请求时保持在速率限制之下。

在提供程序化访问、批量处理功能和自动社交媒体发布时要小心——只为值得信赖的客户启用这些功能。

要防止自动化和高容量的误用,请针对单个用户在指定的时间段内(每日、每周或每月)设置使用限制。考虑为超过限制的用户实施硬上限或手动审核流程。

带有指数回退的重试

避免速率限制错误的一种简单方法是自动重试带有随机指数回退的请求。重试带有指数回退意味着在遇到速率限制错误时执行短暂的休眠,然后重试不成功的请求。如果请求仍然不成功,则增加休眠时间并重复该过程。这将持续到请求成功或达到最大重试次数为止。

这种方法有很多好处:

  • 自动重试可以让您在不发生崩溃或丢失数据的情况下从速率限制错误中恢复。
  • 指数回退意味着您的第一次重试可以很快尝试,同时仍然受益于更长的延迟,如果您的第一次重试失败。
  • 向延迟添加随机抖动可以防止所有重试同时发生。

请注意,不成功的请求会影响您的每分钟限制,因此持续重新发送请求是行不通的。

以下是Python的一些示例解决方案,它们使用指数回退。

使用 Tenacity 库

Tenacity是一个 Apache 2.0 许可的通用重试库,用 Python 编写,旨在简化向几乎任何内容添加重试行为的任务。

要向您的请求添加指数回退,您可以使用tenacity.retry装饰器。下面的示例使用tenacity.wait_random_exponential函数向请求添加随机指数回退。

from openai import OpenAI
client = OpenAI()

from tenacity import retry, stop_after_attempt, wait_random_exponential

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(6))
def completion_with_backoff(**kwargs):
    return client.completions.create(**kwargs)

completion_with_backoff(model="gpt-3.5-turbo-instruct", prompt="Once upon a time,")

请注意,Tenacity 库是第三方工具,OpenAI 不对其可靠性或安全性做出任何保证。

使用 backoff 库

Backoff是另一个 Python 库,它提供 backoff 和重试功能的函数装饰器。

import backoff
import openai
from openai import OpenAI
client = OpenAI()

@backoff.on_exception(backoff.expo, openai.RateLimitError)
def completions_with_backoff(**kwargs):
    return client.completions.create(**kwargs)

completions_with_backoff(model="gpt-3.5-turbo-instruct", prompt="Once upon a time,")

与 Tenacity 一样,Backoff 库也是第三方工具,OpenAI 不对其可靠性或安全性做出任何保证。

手动回退实现

如果您不想使用第三方库,您可以按照以下示例实现自己的回退逻辑:

# imports
import random
import time

import openai
from openai import OpenAI
client = OpenAI()

# define a retry decorator
def retry_with_exponential_backoff(
    func,
    initial_delay: float = 1,
    exponential_base: float = 2,
    jitter: bool = True,
    max_retries: int = 10,
    errors: tuple = (openai.RateLimitError,),
):
    """Retry a function with exponential backoff."""

    def wrapper(*args, **kwargs):
        # Initialize variables
        num_retries = 0
        delay = initial_delay

        # Loop until a successful response or max_retries is hit or an exception is raised
        while True:
            try:
                return func(*args, **kwargs)

            # Retry on specific errors
            except errors as e:
                # Increment retries
                num_retries += 1

                # Check if max retries has been reached
                if num_retries > max_retries:
                    raise Exception(
                        f"Maximum number of retries ({max_retries}) exceeded."
                    )

                # Increment the delay
                delay *= exponential_base * (1 + jitter * random.random())

                # Sleep for the delay
                time.sleep(delay)

            # Raise exceptions for any errors not specified
            except Exception as e:
                raise e

    return wrapper

@retry_with_exponential_backoff
def completions_with_backoff(**kwargs):
    return client.completions.create(**kwargs)

completions_with_backoff(model="gpt-3.5-turbo-instruct", prompt="Once upon a time,")

OpenAI 不对其安全性或效率做出任何保证,但这可以成为您自己解决方案的良好开端。

将 max_tokens 减少到与您的完成相同的大小

您的速率限制是 max_tokens 和基于您的请求的字符数估计的最大值。尝试将 max_tokens 值设置为与预期响应大小尽可能接近。

批量请求

如果您的用例不需要即时响应,您可以使用批处理 API更轻松地提交和执行大量请求,而不会影响您的同步请求速率限制。

对于确实需要同步响应的用例,OpenAI API 对每分钟请求数每分钟令牌数具有单独的限制。

如果您达到了每分钟请求数的限制,但仍有可用的每分钟令牌数容量,则可以通过将多个任务批量处理到每个请求中来增加吞吐量。这将允许您在使用我们的小型模型时处理更多的令牌。

发送批量提示与发送单个请求的方式完全相同,只需将提示参数中的单个字符串更改为字符串列表即可。

没有批处理的示例

from openai import OpenAI
client = OpenAI()

num_stories = 10
prompt = "Once upon a time,"

# serial example, with one story completion per request
for _ in range(num_stories):
    response = client.completions.create(
        model="curie",
        prompt=prompt,
        max_tokens=20,
    )

    # print story
    print(prompt + response.choices[0].text)

批量示例

from openai import OpenAI
client = OpenAI()

num_stories = 10
prompts = ["Once upon a time,"] * num_stories

# batched example, with 10 story completions per request
response = client.completions.create(
    model="curie",
    prompt=prompts,
    max_tokens=20,
)

# match completions to prompts by index
stories = [""] * len(prompts)
for choice in response.choices:
    stories[choice.index] = prompts[choice.index] + choice.text

# print stories
for story in stories:
    print(story)

警告:响应对象可能不会按提示的顺序返回完成,因此始终记住使用索引字段将响应与提示匹配。

Was this page helpful?