Google 翻译 api 超时
Google translate api timeout
我大约有 20000 篇文章要翻译,每篇平均长度在 100 个字符左右。我正在使用多处理库来加速我的 API 调用。如下所示:
from google.cloud.translate_v2 import Client
from time import sleep
from tqdm.notebook import tqdm
import multiprocessing as mp
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cred_file
translate_client = Client()
def trans(text, MAX_TRIES=5):
res = None
sleep_time = 1
for i in range(MAX_TRIES):
try:
res = translate_client.translate(text, target_language="en", model="nmt")
error = None
except Exception as error:
pass
if res is None:
sleep(sleep_time) # wait for 1 seconds before trying to fetch the data again
sleep_time *= 2
else:
break
return res["translatedText"]
src_text = # eg. ["this is a sentence"]*20000
with mp.Pool(mp.cpu_count()) as pool:
translated = list(tqdm(pool.imap(trans, src_text), total=len(src_text)))
不幸的是,上面的代码每次都在迭代 2828 +/- 5 左右失败 (HTTP Error 503: Service Unavailable
)。我希望有一个可变的睡眠时间可以让它重新启动并且 运行 正常。奇怪的是,如果我要立即重新启动循环,它会毫无问题地再次启动,即使自代码完成执行以来已经过去了 < 2^4 秒。所以问题是:
- 我是不是做错了
try/except
点?
- 正在执行多处理以某种方式影响 API。
- 总体思路?
我需要多处理,否则我将等待大约 3 个小时才能完成整个过程。
503 错误意味着此问题在 Google 方面,这使我相信您可能遇到了速率限制。正如 Raphael 提到的,响应中是否有 Retry-After
header?我建议查看响应 headers,因为它可能会更具体地告诉您发生了什么,并可能为您提供有关如何修复它的信息。
一些想法,之前试过的google API,只能处理一定数量的并发请求,如果达到限制,服务会return报错HTTP 503
“Service Unavailable
。”和 HTTP 403
如果 Daily limit is Exceeded
或 User Rate Limit
.
尝试使用指数退避算法实现重试。重试等待时间呈指数增长的操作,直到达到最大重试次数。它将提高带宽使用率并最大化并发环境中的请求吞吐量。
并查看配额和限制 page。
Google API 非常擅长隐藏预成型 Google T运行slation 的复杂性。不幸的是,如果您进入 Google API 代码,它使用的是标准 HTTP 请求。这意味着当您 运行 处理 20、000 多个请求时,无论线程池如何,都会出现巨大的瓶颈。
考虑使用 aiohttp (you’ll need to install from pip) and asyncio 创建 HTTP 请求。这将允许您 运行 异步 HTTP 请求。 (这意味着您不需要使用 google.cloud.translate_v2、multiprocessing 或 tqdm.notebook).
只需在asyncio.run()中调用一个await方法,该方法可以创建一个方法数组来执行aiohttp.session.get( )。然后调用asyncio.gather()收集所有结果。
在下面的示例中,我使用了 API 密钥 https://console.cloud.google.com/apis/credentials(而不是 Google 应用程序凭据/服务帐户)。
将您的示例与 asyncio 和 aiohttp 结合使用,它 运行 在 30 秒内完成并且没有任何错误。 (尽管您可能希望将超时延长到会话)。
值得一提的是,Google 的限制是每分钟 600 万 个字符。您的测试正在进行 360,000。因此,如果您在一分钟内 运行 测试 17 次,您将达到极限!
另外速度主要是由机器决定的而不是Google API。 (我 运行 我在 3GHz、8 核和 16GB 内存的电脑上进行测试)。
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
async def main():
statements = ["this is another sentence"]*20000
Logger({'Message': f'Start running Google Translate API for {len(statements)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(statements)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(statements))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
附加测试
最初的测试是运行使用相同的运行slation。因此我创建了一个测试来检查结果没有被缓存在 Google 上。我手动将电子书复制到文本文件中。然后在 Python 中,代码打开文件并将文本分组到 100 个字符的数组中,然后从数组中取出前 20,000 个项目并 t运行 列出每一行。有趣的是,它仍然用了不到 30 秒。
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await aiohttp.session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
def readEbook():
# This is a simple test to make sure response is not cached.
# I grabbed a random online pdf (http://sd.blackball.lv/library/Beginning_Software_Engineering_(2015).pdf) and copied text into notepad.
with open("C:\Dev\ebook.txt", "r", encoding="utf8") as f:
return f.read()
def chunkText(text):
chunk_size = 100
chunks= len(text)
chunk_array = [text[i:i+chunk_size] for i in range(0, chunks, chunk_size)]
formatResults = [x for x in chunk_array if len(x) > 10]
return formatResults[:20000]
async def main():
data = readEbook()
chunk_data = chunkText(data)
Logger({'Message': f'Start running Google Translate API for {len(chunk_data)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(chunk_data)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(chunk_data))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
最后,您可以找到有关 Google T运行slate API HTTP 请求 https://cloud.google.com/translate/docs/reference/rest/v2/translate 的更多信息,并且您可以 运行 通过 Postman 请求。
我大约有 20000 篇文章要翻译,每篇平均长度在 100 个字符左右。我正在使用多处理库来加速我的 API 调用。如下所示:
from google.cloud.translate_v2 import Client
from time import sleep
from tqdm.notebook import tqdm
import multiprocessing as mp
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = cred_file
translate_client = Client()
def trans(text, MAX_TRIES=5):
res = None
sleep_time = 1
for i in range(MAX_TRIES):
try:
res = translate_client.translate(text, target_language="en", model="nmt")
error = None
except Exception as error:
pass
if res is None:
sleep(sleep_time) # wait for 1 seconds before trying to fetch the data again
sleep_time *= 2
else:
break
return res["translatedText"]
src_text = # eg. ["this is a sentence"]*20000
with mp.Pool(mp.cpu_count()) as pool:
translated = list(tqdm(pool.imap(trans, src_text), total=len(src_text)))
不幸的是,上面的代码每次都在迭代 2828 +/- 5 左右失败 (HTTP Error 503: Service Unavailable
)。我希望有一个可变的睡眠时间可以让它重新启动并且 运行 正常。奇怪的是,如果我要立即重新启动循环,它会毫无问题地再次启动,即使自代码完成执行以来已经过去了 < 2^4 秒。所以问题是:
- 我是不是做错了
try/except
点? - 正在执行多处理以某种方式影响 API。
- 总体思路?
我需要多处理,否则我将等待大约 3 个小时才能完成整个过程。
503 错误意味着此问题在 Google 方面,这使我相信您可能遇到了速率限制。正如 Raphael 提到的,响应中是否有 Retry-After
header?我建议查看响应 headers,因为它可能会更具体地告诉您发生了什么,并可能为您提供有关如何修复它的信息。
一些想法,之前试过的google API,只能处理一定数量的并发请求,如果达到限制,服务会return报错HTTP 503
“Service Unavailable
。”和 HTTP 403
如果 Daily limit is Exceeded
或 User Rate Limit
.
尝试使用指数退避算法实现重试。重试等待时间呈指数增长的操作,直到达到最大重试次数。它将提高带宽使用率并最大化并发环境中的请求吞吐量。
并查看配额和限制 page。
Google API 非常擅长隐藏预成型 Google T运行slation 的复杂性。不幸的是,如果您进入 Google API 代码,它使用的是标准 HTTP 请求。这意味着当您 运行 处理 20、000 多个请求时,无论线程池如何,都会出现巨大的瓶颈。
考虑使用 aiohttp (you’ll need to install from pip) and asyncio 创建 HTTP 请求。这将允许您 运行 异步 HTTP 请求。 (这意味着您不需要使用 google.cloud.translate_v2、multiprocessing 或 tqdm.notebook).
只需在asyncio.run()中调用一个await方法,该方法可以创建一个方法数组来执行aiohttp.session.get( )。然后调用asyncio.gather()收集所有结果。
在下面的示例中,我使用了 API 密钥 https://console.cloud.google.com/apis/credentials(而不是 Google 应用程序凭据/服务帐户)。
将您的示例与 asyncio 和 aiohttp 结合使用,它 运行 在 30 秒内完成并且没有任何错误。 (尽管您可能希望将超时延长到会话)。
值得一提的是,Google 的限制是每分钟 600 万 个字符。您的测试正在进行 360,000。因此,如果您在一分钟内 运行 测试 17 次,您将达到极限!
另外速度主要是由机器决定的而不是Google API。 (我 运行 我在 3GHz、8 核和 16GB 内存的电脑上进行测试)。
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
async def main():
statements = ["this is another sentence"]*20000
Logger({'Message': f'Start running Google Translate API for {len(statements)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(statements)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(statements))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
附加测试
最初的测试是运行使用相同的运行slation。因此我创建了一个测试来检查结果没有被缓存在 Google 上。我手动将电子书复制到文本文件中。然后在 Python 中,代码打开文件并将文本分组到 100 个字符的数组中,然后从数组中取出前 20,000 个项目并 t运行 列出每一行。有趣的是,它仍然用了不到 30 秒。
import asyncio
import aiohttp
from collections import namedtuple
import json
from urllib.parse import quote
TranslateReponseModel = namedtuple('TranslateReponseModel', ['sourceText', 'translatedText', 'detectedSourceLanguage']) # model to store results.
def Logger(json_message):
print(json.dumps(json_message)) # Note: logging json is just my personal preference.
async def DownloadString(session, url, index):
while True: # If client error - this will retry. You may want to limit the amount of attempts
try:
r = await aiohttp.session.get(url)
text = await r.text()
#Logger({"data": html, "status": r.status})
r.raise_for_status() # This will error if API return 4xx or 5xx status.
return text
except aiohttp.ClientConnectionError as e:
Logger({'Exception': f"Index {index} - connection was dropped before we finished", 'Details': str(e), 'Url': url })
except aiohttp.ClientError as e:
Logger({'Exception': f"Index {index} - something went wrong. Not a connection error, that was handled", 'Details': str(e), 'Url': url})
def FormatResponse(sourceText, responseText):
jsonResponse = json.loads(responseText)
return TranslateReponseModel(sourceText, jsonResponse["data"]["translations"][0]["translatedText"], jsonResponse["data"]["translations"][0]["detectedSourceLanguage"])
def TranslatorUriBuilder(targetLanguage, sourceText):
apiKey = 'ABCDED1234' # TODO This is a 41 characters API Key. You'll need to generate one (it's not part of the json certificate)
return f"https://translation.googleapis.com/language/translate/v2?key={apiKey}={quote(sourceText)}&target={targetLanguage}"
async def Process(session, sourceText, lineNumber):
translateUri = TranslatorUriBuilder('en', sourceText) # Country code is set to en (English)
translatedResponseText = await DownloadString(session, translateUri, lineNumber)
response = FormatResponse(sourceText, translatedResponseText)
return response
def readEbook():
# This is a simple test to make sure response is not cached.
# I grabbed a random online pdf (http://sd.blackball.lv/library/Beginning_Software_Engineering_(2015).pdf) and copied text into notepad.
with open("C:\Dev\ebook.txt", "r", encoding="utf8") as f:
return f.read()
def chunkText(text):
chunk_size = 100
chunks= len(text)
chunk_array = [text[i:i+chunk_size] for i in range(0, chunks, chunk_size)]
formatResults = [x for x in chunk_array if len(x) > 10]
return formatResults[:20000]
async def main():
data = readEbook()
chunk_data = chunkText(data)
Logger({'Message': f'Start running Google Translate API for {len(chunk_data)}'})
results = []
async with aiohttp.ClientSession() as session:
results = await asyncio.gather(*[Process(session, val, idx) for idx, val in enumerate(chunk_data)] )
Logger({'Message': f'Results are: {", ".join(map(str, [x.translatedText for x in results]))}'})
Logger({'Message': f'Finished running Google Translate API for {str(len(chunk_data))} and got {str(len(results))} results'})
if __name__ == '__main__':
asyncio.run(main())
最后,您可以找到有关 Google T运行slate API HTTP 请求 https://cloud.google.com/translate/docs/reference/rest/v2/translate 的更多信息,并且您可以 运行 通过 Postman 请求。