Python 的重试装饰器的动态参数
Dynamic parameters with Python's retry decorator
我目前正在使用 python retrying package,其中包含 @retry
装饰器,该装饰器具有许多可选参数。我为我们的生产环境正确设置了这些参数,在重试之间有足够长的等待时间(低于它设置为 2000 毫秒),但我想为单元测试目的不同地设置这些值,以便执行非常快。
例如,这里的 wait_fixed 生产时间设置为 2000 毫秒,但对于调用 some_function()
的单元测试,我想将 wait_fixed
参数覆盖为 1毫秒,所以它执行得非常快。
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def some_function(self):
return True
我 运行 遇到的问题是装饰器是 interpreted when the function is defined 所以我还没有找到一种方法来覆盖我的单元测试中的 wait_fixed
参数。
main.py
import settings
from retrying import retry
@retry(stop_max_attempt_number=settings.STOP_MAX_ATTEMPT_NUMBER,
wait_fixed=settings.WAIT_FIXED)
def some_function(self):
return True
settings.py
import json
with open('config.json') as jsonf:
config = json.loads(jsonf.read())
WAIT_FIXED=int(config['WAIT_FIXED'])
STOP_MAX_ATTEMPT_NUMBER=int(config['STOP_MAX_ATTEMPT_NUMBER'])
config.json
{
"STOP_MAX_ATTEMPT_NUMBER" : "3",
"WAIT_FIXED" : "2000"
}
让您的测试运行器放置一个合适的 config.json
。
我需要能够在调用函数时动态设置重试参数,下面的方法对我来说效果很好:
import random
from retrying import retry
class MyClass(object):
def try_stuff(self, string, attempts, wait):
self.do_something_with_retry = retry(stop_max_attempt_number=attempts,
wait_fixed=wait)(self.do_something_unreliable)
return self.do_something_with_retry(string)
def do_something_unreliable(self, string):
print(string)
if random.randint(0, 10) > 1:
raise IOError("Broken sauce, everything is hosed!!!111one")
else:
return "Awesome sauce!"
instance = MyClass()
print instance.try_stuff('test', 10, 2000)
我最近遇到一个问题,def
在 def
中解决了这个问题。
我需要在 get_url
函数上使用重试装饰器,但需要为 stop_max_attempt_number
传递可配置值。我无法想出 get_url
,以及同一缩进的其他函数。
为了解决这个问题,我在get_analytic
函数中定义了get_url
函数。举例说明:
def get_analytics(conf: Configuration, assets: list, cache: dict) -> list:
STOP_MAX_ATTEMPT_NUMBER = int(conf.num_retry)
WAIT_FIXED = int(conf.retry_timeout)
@retry(stop_max_attempt_number=STOP_MAX_ATTEMPT_NUMBER, wait_fixed=WAIT_FIXED)
def get_url(disable_https: bool, url: str, user: str, password: str) -> Response:
我目前正在使用 python retrying package,其中包含 @retry
装饰器,该装饰器具有许多可选参数。我为我们的生产环境正确设置了这些参数,在重试之间有足够长的等待时间(低于它设置为 2000 毫秒),但我想为单元测试目的不同地设置这些值,以便执行非常快。
例如,这里的 wait_fixed 生产时间设置为 2000 毫秒,但对于调用 some_function()
的单元测试,我想将 wait_fixed
参数覆盖为 1毫秒,所以它执行得非常快。
@retry(stop_max_attempt_number=3, wait_fixed=2000)
def some_function(self):
return True
我 运行 遇到的问题是装饰器是 interpreted when the function is defined 所以我还没有找到一种方法来覆盖我的单元测试中的 wait_fixed
参数。
main.py
import settings
from retrying import retry
@retry(stop_max_attempt_number=settings.STOP_MAX_ATTEMPT_NUMBER,
wait_fixed=settings.WAIT_FIXED)
def some_function(self):
return True
settings.py
import json
with open('config.json') as jsonf:
config = json.loads(jsonf.read())
WAIT_FIXED=int(config['WAIT_FIXED'])
STOP_MAX_ATTEMPT_NUMBER=int(config['STOP_MAX_ATTEMPT_NUMBER'])
config.json
{
"STOP_MAX_ATTEMPT_NUMBER" : "3",
"WAIT_FIXED" : "2000"
}
让您的测试运行器放置一个合适的 config.json
。
我需要能够在调用函数时动态设置重试参数,下面的方法对我来说效果很好:
import random
from retrying import retry
class MyClass(object):
def try_stuff(self, string, attempts, wait):
self.do_something_with_retry = retry(stop_max_attempt_number=attempts,
wait_fixed=wait)(self.do_something_unreliable)
return self.do_something_with_retry(string)
def do_something_unreliable(self, string):
print(string)
if random.randint(0, 10) > 1:
raise IOError("Broken sauce, everything is hosed!!!111one")
else:
return "Awesome sauce!"
instance = MyClass()
print instance.try_stuff('test', 10, 2000)
我最近遇到一个问题,def
在 def
中解决了这个问题。
我需要在 get_url
函数上使用重试装饰器,但需要为 stop_max_attempt_number
传递可配置值。我无法想出 get_url
,以及同一缩进的其他函数。
为了解决这个问题,我在get_analytic
函数中定义了get_url
函数。举例说明:
def get_analytics(conf: Configuration, assets: list, cache: dict) -> list:
STOP_MAX_ATTEMPT_NUMBER = int(conf.num_retry)
WAIT_FIXED = int(conf.retry_timeout)
@retry(stop_max_attempt_number=STOP_MAX_ATTEMPT_NUMBER, wait_fixed=WAIT_FIXED)
def get_url(disable_https: bool, url: str, user: str, password: str) -> Response: