你怎么知道一个 Locust 实例是 运行 作为 master 还是 worker?
How do you know if a Locust instance is running as master or worker?
这是我想做的一个简单示例。
在想运行一个安静的代码只有当它是高手。
class SimpleTaskSet(TaskSet):
@task
def do_something(self):
# do something ...
pass
class MyOwnLocust(Locust):
task_set = SimpleTaskSet
min_wait = 1
max_wait = 10
def setup(self):
if master: # How to here the code is running on master?
# do something
pass
else:
# do something else
pass
为了检查一个locust是master还是slave,你可以使用locust的runners
,然后检查当前的运行ner是否是Master的一个实例:
from locust import runners
... # Your code
def setup(self):
if isinstance(runners.locust_runner, runners.MasterLocustRunner):
# do something
pass
else:
# do something else
pass
但是请记住,Locust Master 不执行任务,也不会像 Locust Slave 那样执行所有功能。
编辑 - 对于第二个问题
据我所知,考虑到 Locust 的工作方式,运行在整个测试中仅使用一段代码可能非常棘手。
您可以做的第一件事就是像这样使用 TaskSet 方法setup
:
class Behavior(TaskSet):
def setup(self):
# Do things only once here
# The rest of your code
class User(HttpLocust):
task_set = Behavior
# ....
您可以在官方文档中阅读更多有关设置/on_start/on+stop 等的信息:https://docs.locust.io/en/stable/writing-a-locustfile.html?highlight=setup#setups-teardowns-on-start-and-on-stop
现在,这个设置有什么问题?我不知道你在这个设置部分想要实现什么,但重要的是要记住这个函数每次测试只会 运行 一次,但每个进程只会 运行 一次。
这是什么意思?如果您 运行 在分布式模式(主从设置)下进行 Locust 测试,setup
函数将为每个 python 进程 运行 一次(a.k.a。您开始测试的每个单独的从属终端)。
如果这对您来说不是问题,您可以轻松地使用设置功能来实现您想要实现的目标。但是,如果没有,您可能需要想出一种让 Locust 进程相互了解的方法,以便它们可以检查是否也需要 运行 setup
的内容或跳过阻止是因为从属进程已经 运行 它。这可能会很棘手。
根据 Locust 文档 https://docs.locust.io/en/stable/writing-a-locustfile.html#test-start-and-test-stop-events,当 运行 在分布式模式下运行时,test_start 事件只会在主机上 运行。
test_start 事件将在蝗虫初始化事件之后发生。
您还可以在模块级别使用上述技巧检查节点类型,并在任何工作人员或任务孵化之前使用 运行 代码。
看到这个稍微 out-of-date (locust 0.9) 例子 https://medium.com/locust-io-experiments/locust-experiments-feeding-the-locusts-cf09e0f65897.
Locust docs 显示这个例子:
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message('test_users', setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message('acknowledge_users', on_acknowledge)
这对 OP 的问题没有直接用处,但您可以通过用户 class 到达 environment.runner
。我在 Locust 2.5.1 中按如下方式使用它:
@events.test_stop.add_listener
def test_stop(**kw):
if ('environment' in kw):
if isinstance(kw['environment'].runner, MasterRunner):
MySequentialTaskSet.print_stats()
这是我想做的一个简单示例。
在想运行一个安静的代码只有当它是高手。
class SimpleTaskSet(TaskSet):
@task
def do_something(self):
# do something ...
pass
class MyOwnLocust(Locust):
task_set = SimpleTaskSet
min_wait = 1
max_wait = 10
def setup(self):
if master: # How to here the code is running on master?
# do something
pass
else:
# do something else
pass
为了检查一个locust是master还是slave,你可以使用locust的runners
,然后检查当前的运行ner是否是Master的一个实例:
from locust import runners
... # Your code
def setup(self):
if isinstance(runners.locust_runner, runners.MasterLocustRunner):
# do something
pass
else:
# do something else
pass
但是请记住,Locust Master 不执行任务,也不会像 Locust Slave 那样执行所有功能。
编辑 - 对于第二个问题
据我所知,考虑到 Locust 的工作方式,运行在整个测试中仅使用一段代码可能非常棘手。
您可以做的第一件事就是像这样使用 TaskSet 方法setup
:
class Behavior(TaskSet):
def setup(self):
# Do things only once here
# The rest of your code
class User(HttpLocust):
task_set = Behavior
# ....
您可以在官方文档中阅读更多有关设置/on_start/on+stop 等的信息:https://docs.locust.io/en/stable/writing-a-locustfile.html?highlight=setup#setups-teardowns-on-start-and-on-stop
现在,这个设置有什么问题?我不知道你在这个设置部分想要实现什么,但重要的是要记住这个函数每次测试只会 运行 一次,但每个进程只会 运行 一次。
这是什么意思?如果您 运行 在分布式模式(主从设置)下进行 Locust 测试,setup
函数将为每个 python 进程 运行 一次(a.k.a。您开始测试的每个单独的从属终端)。
如果这对您来说不是问题,您可以轻松地使用设置功能来实现您想要实现的目标。但是,如果没有,您可能需要想出一种让 Locust 进程相互了解的方法,以便它们可以检查是否也需要 运行 setup
的内容或跳过阻止是因为从属进程已经 运行 它。这可能会很棘手。
根据 Locust 文档 https://docs.locust.io/en/stable/writing-a-locustfile.html#test-start-and-test-stop-events,当 运行 在分布式模式下运行时,test_start 事件只会在主机上 运行。 test_start 事件将在蝗虫初始化事件之后发生。 您还可以在模块级别使用上述技巧检查节点类型,并在任何工作人员或任务孵化之前使用 运行 代码。 看到这个稍微 out-of-date (locust 0.9) 例子 https://medium.com/locust-io-experiments/locust-experiments-feeding-the-locusts-cf09e0f65897.
Locust docs 显示这个例子:
@events.init.add_listener
def on_locust_init(environment, **_kwargs):
if not isinstance(environment.runner, MasterRunner):
environment.runner.register_message('test_users', setup_test_users)
if not isinstance(environment.runner, WorkerRunner):
environment.runner.register_message('acknowledge_users', on_acknowledge)
这对 OP 的问题没有直接用处,但您可以通过用户 class 到达 environment.runner
。我在 Locust 2.5.1 中按如下方式使用它:
@events.test_stop.add_listener
def test_stop(**kw):
if ('environment' in kw):
if isinstance(kw['environment'].runner, MasterRunner):
MySequentialTaskSet.print_stats()