速率限制 API 在 Google App Engine 上使用 Django 调用 Shopify API
Rate Limit API Calls to Shopify API with Django on Google App Engine
我正在尝试使用托管在 Google App Engine 上的 Django 应用中的 Shopify API。
对于我的本地单线程脚本,我使用 modified version of this 来确保我不会超过 Shopify 的速率限制:
# Setup local bucket to limit API Calls
bucket = TokenBucket(40, .5)
api_call_success = False
while not api_call_success:
if bucket.tokens < 1:
sleep(.5)
else:
[... do an API call ...]
bucket.consume(1)
api_call_success = True
这适用于我的本地脚本,但不适用于我的 Google App Engine 托管应用程序,其中可能有多个租户,并且同时发生多个会话。
我一直在努力研究处理这种速率限制的最佳方法,目前打算尝试不断地将每个 users/stores 请求响应 header 写入内存缓存,以便我可以随时检查'x-shopify-shop-api-call-limit' 查看之前的通话限制(和通话时间)。所以我尝试了这样的事情:
fill_rate = .5
capacity = 40
# get memcache key info
last_call_time = memcache.get(memKey+"_last_call_time")
last_call_value = memcache.get(memKey+"_last_call_value")
# Calculate how many tokens should be available
now = datetime.datetime.utcnow()
delta = fill_rate * ((now - last_call_time).seconds)
tokensAvailable = min(capacity, delta + last_call_value)
# Check if we can perform operation
if tokensAvailble > 1:
[... Some work involving Shopify API call ...]
# Do some work and then update memcache
memcache.set_multi( {"_last_call_time": datetime.datetime.strptime(resp_dict['date'], '%a, %d %b %Y %H:%M:%S %Z'), "_last_call_value": resp_dict['x-shopify-shop-api-call-limit'].split('/',1)[0]}, key_prefix=memKey, time=120)
else:
[... wait ...]
谁能推荐一个更好的方法来管理这个速率限制?
Shopify 分发带有速率限制代码的 CLI Ruby gem。由于 Ruby 和 Python 在句法上很接近,因此您阅读它们的代码应该没有什么困难。由于 Shopify 代码通常是按照高标准编写的,如果你掠夺他们的模式并转换为 Python,你应该能够在 GAE 上正常运行。
我基本上和你有相同的逻辑(使用 Redis),但是我没有在任何地方做这个内联,而是猴子修补 shopify.base.ShopifyConnection
像这样:
from time import sleep
from django.conf import settings
from pyactiveresource.activeresource import formats
from pyactiveresource.connection import (
Connection,
ConnectionError,
ServerError,
)
import shopify
class ShopifyConnection(Connection, object):
response = None
def __init__(self, site, user=None, password=None, timeout=None,
format=formats.JSONFormat):
super(ShopifyConnection, self).__init__(site, user, password, timeout, format)
def consume_token(uid, capacity, rate, min_interval=0):
# Your rate limiting logic here
def _open(self, *args, **kwargs):
uid = self.site.split("https://")[-1].split(".myshopify.com")[0]
self.response = None
retries = 0
while True:
try:
self.consume_token(uid, 40, 1.95, 0.05)
self.response = super(ShopifyConnection, self)._open(*args, **kwargs)
return self.response
except (ConnectionError, ServerError) as err:
retries += 1
if retries > settings.SHOPIFY_MAX_RETRIES:
self.response = err.response
raise
sleep(settings.SHOPIFY_RETRY_WAIT)
shopify.base.ShopifyConnection = ShopifyConnection
您需要将此代码保存在应用目录中的一个文件中,然后将该文件导入应用的 __init__.py
。这样,我就可以编写其余的代码,而不必担心任何速率限制逻辑。您唯一需要注意的是在更新 shopify
模块时检查 ShopifyConnection
class 是否发生变化,并相应地更新猴子补丁。这不是什么大问题,因为模块不经常更新。
(如您所见,我将这个猴子补丁作为插入重试逻辑的机会,因为大约 1/1000 的请求无故失败。我定义了 SHOPIFY_MAX_RETRIES
和 SHOPIFY_RETRY_WAIT
in settings.py
并在此处提取值。)
import logging
from google.appengine.api import memcache
import datetime
from datetime import date, timedelta
from django.conf import settings
from time import sleep
# Store the response from the last request in the connection object
class ShopifyConnection(pyactiveresource.connection.Connection):
response = None
def __init__(self, site, user=None, password=None, timeout=None,
format=formats.JSONFormat):
super(ShopifyConnection, self).__init__(site, user, password, timeout, format)
def consume_token(self, uid, capacity, rate, min_interval):
# Get this users last UID
last_call_time = memcache.get(uid+"_last_call_time")
last_call_value = memcache.get(uid+"_last_call_value")
if last_call_time and last_call_value:
# Calculate how many tokens are regenerated
now = datetime.datetime.utcnow()
delta = rate * ((now - last_call_time).seconds)
# If there is no change in time then check what last call value was
if delta == 0:
tokensAvailable = min(capacity, capacity - last_call_value)
# If there was a change in time, how much regen has occurred
else:
tokensAvailable = min(capacity, (capacity - last_call_value) + delta)
# No tokens available can't call
if tokensAvailable <= min_interval:
raise pyactiveresource.connection.ConnectionError(message="No tokens available for: " + str(uid))
def _open(self, *args, **kwargs):
uid = self.site.split("https://")[-1].split(".myshopify.com")[0]
self.response = None
retries = 0
while True:
try:
self.consume_token(uid, 40, 2, settings.SHOPIFY_MIN_TOKENS)
self.response = super(ShopifyConnection, self)._open(*args, **kwargs)
# Set the memcache reference
memcache.set_multi( {
"_last_call_time": datetime.datetime.strptime(self.response.headers['date'], '%a, %d %b %Y %H:%M:%S %Z'), "_last_call_value": int(self.response.headers['x-shopify-shop-api-call-limit'].split('/',1)[0])},
key_prefix=uid, time=25)
return self.response
except (pyactiveresource.connection.ConnectionError, pyactiveresource.connection.ServerError) as err:
retries += 1
if retries > settings.SHOPIFY_MAX_RETRIES:
self.response = err.response
logging.error("Logging error for _open ShopifyConnection: " + str(uid) + ":" + str(err.message))
raise
sleep(settings.SHOPIFY_RETRY_WAIT)
多亏了用户 Julien,我想我能够做到这一点。只是想通过测试和反馈确认没有任何疏忽。
我正在尝试使用托管在 Google App Engine 上的 Django 应用中的 Shopify API。
对于我的本地单线程脚本,我使用 modified version of this 来确保我不会超过 Shopify 的速率限制:
# Setup local bucket to limit API Calls
bucket = TokenBucket(40, .5)
api_call_success = False
while not api_call_success:
if bucket.tokens < 1:
sleep(.5)
else:
[... do an API call ...]
bucket.consume(1)
api_call_success = True
这适用于我的本地脚本,但不适用于我的 Google App Engine 托管应用程序,其中可能有多个租户,并且同时发生多个会话。
我一直在努力研究处理这种速率限制的最佳方法,目前打算尝试不断地将每个 users/stores 请求响应 header 写入内存缓存,以便我可以随时检查'x-shopify-shop-api-call-limit' 查看之前的通话限制(和通话时间)。所以我尝试了这样的事情:
fill_rate = .5
capacity = 40
# get memcache key info
last_call_time = memcache.get(memKey+"_last_call_time")
last_call_value = memcache.get(memKey+"_last_call_value")
# Calculate how many tokens should be available
now = datetime.datetime.utcnow()
delta = fill_rate * ((now - last_call_time).seconds)
tokensAvailable = min(capacity, delta + last_call_value)
# Check if we can perform operation
if tokensAvailble > 1:
[... Some work involving Shopify API call ...]
# Do some work and then update memcache
memcache.set_multi( {"_last_call_time": datetime.datetime.strptime(resp_dict['date'], '%a, %d %b %Y %H:%M:%S %Z'), "_last_call_value": resp_dict['x-shopify-shop-api-call-limit'].split('/',1)[0]}, key_prefix=memKey, time=120)
else:
[... wait ...]
谁能推荐一个更好的方法来管理这个速率限制?
Shopify 分发带有速率限制代码的 CLI Ruby gem。由于 Ruby 和 Python 在句法上很接近,因此您阅读它们的代码应该没有什么困难。由于 Shopify 代码通常是按照高标准编写的,如果你掠夺他们的模式并转换为 Python,你应该能够在 GAE 上正常运行。
我基本上和你有相同的逻辑(使用 Redis),但是我没有在任何地方做这个内联,而是猴子修补 shopify.base.ShopifyConnection
像这样:
from time import sleep
from django.conf import settings
from pyactiveresource.activeresource import formats
from pyactiveresource.connection import (
Connection,
ConnectionError,
ServerError,
)
import shopify
class ShopifyConnection(Connection, object):
response = None
def __init__(self, site, user=None, password=None, timeout=None,
format=formats.JSONFormat):
super(ShopifyConnection, self).__init__(site, user, password, timeout, format)
def consume_token(uid, capacity, rate, min_interval=0):
# Your rate limiting logic here
def _open(self, *args, **kwargs):
uid = self.site.split("https://")[-1].split(".myshopify.com")[0]
self.response = None
retries = 0
while True:
try:
self.consume_token(uid, 40, 1.95, 0.05)
self.response = super(ShopifyConnection, self)._open(*args, **kwargs)
return self.response
except (ConnectionError, ServerError) as err:
retries += 1
if retries > settings.SHOPIFY_MAX_RETRIES:
self.response = err.response
raise
sleep(settings.SHOPIFY_RETRY_WAIT)
shopify.base.ShopifyConnection = ShopifyConnection
您需要将此代码保存在应用目录中的一个文件中,然后将该文件导入应用的 __init__.py
。这样,我就可以编写其余的代码,而不必担心任何速率限制逻辑。您唯一需要注意的是在更新 shopify
模块时检查 ShopifyConnection
class 是否发生变化,并相应地更新猴子补丁。这不是什么大问题,因为模块不经常更新。
(如您所见,我将这个猴子补丁作为插入重试逻辑的机会,因为大约 1/1000 的请求无故失败。我定义了 SHOPIFY_MAX_RETRIES
和 SHOPIFY_RETRY_WAIT
in settings.py
并在此处提取值。)
import logging
from google.appengine.api import memcache
import datetime
from datetime import date, timedelta
from django.conf import settings
from time import sleep
# Store the response from the last request in the connection object
class ShopifyConnection(pyactiveresource.connection.Connection):
response = None
def __init__(self, site, user=None, password=None, timeout=None,
format=formats.JSONFormat):
super(ShopifyConnection, self).__init__(site, user, password, timeout, format)
def consume_token(self, uid, capacity, rate, min_interval):
# Get this users last UID
last_call_time = memcache.get(uid+"_last_call_time")
last_call_value = memcache.get(uid+"_last_call_value")
if last_call_time and last_call_value:
# Calculate how many tokens are regenerated
now = datetime.datetime.utcnow()
delta = rate * ((now - last_call_time).seconds)
# If there is no change in time then check what last call value was
if delta == 0:
tokensAvailable = min(capacity, capacity - last_call_value)
# If there was a change in time, how much regen has occurred
else:
tokensAvailable = min(capacity, (capacity - last_call_value) + delta)
# No tokens available can't call
if tokensAvailable <= min_interval:
raise pyactiveresource.connection.ConnectionError(message="No tokens available for: " + str(uid))
def _open(self, *args, **kwargs):
uid = self.site.split("https://")[-1].split(".myshopify.com")[0]
self.response = None
retries = 0
while True:
try:
self.consume_token(uid, 40, 2, settings.SHOPIFY_MIN_TOKENS)
self.response = super(ShopifyConnection, self)._open(*args, **kwargs)
# Set the memcache reference
memcache.set_multi( {
"_last_call_time": datetime.datetime.strptime(self.response.headers['date'], '%a, %d %b %Y %H:%M:%S %Z'), "_last_call_value": int(self.response.headers['x-shopify-shop-api-call-limit'].split('/',1)[0])},
key_prefix=uid, time=25)
return self.response
except (pyactiveresource.connection.ConnectionError, pyactiveresource.connection.ServerError) as err:
retries += 1
if retries > settings.SHOPIFY_MAX_RETRIES:
self.response = err.response
logging.error("Logging error for _open ShopifyConnection: " + str(uid) + ":" + str(err.message))
raise
sleep(settings.SHOPIFY_RETRY_WAIT)
多亏了用户 Julien,我想我能够做到这一点。只是想通过测试和反馈确认没有任何疏忽。