当指定数量的用户任务完成时停止 Locust

Stop Locust When Specified Number Of User Tasks Complete

在我的场景中,我是 运行 没有网络的蝗虫 UI。我使用的命令是

locust -f my_locust_file --no_web -c 20 -r 4     # as a hack I add -t 10s

这对应于每秒孵化 4 个用户,总共 20 个用户。

我的 objective 是让 20 个蝗虫用户中的每一个执行一个任务,我希望蝗虫 运行 完成并在最后一个用户(第 20 个用户)的任务完成时退出。收集的统计信息应仅包括与每个任务关联的响应时间。

在这个场景中,已经确定了 5 个任务(用户场景),它们可以与用户随机关联:

class UserScenarios(TaskSet):
    tasks = [Sequence_One, ServerSequence, Components_Sequence, Embedded_Sequence, Connectivity_Sequence]

class MyLocust(HttpLocust):
    def __init__(self):
        super().__init__()
        MyLocust.counter += 1
        print(f"Counter = {MyLocust.counter}")

    counter = 0
    task_set = UserScenarios
    wait_time = between(1, 1)
    host = 'https://*****.com'

每个任务(用户场景)对应于应按顺序加载的 3 或 4 个页面的不同序列。由 2 页组成的经过清理和简化的示例序列是:

class Sequence_One(TaskSequence):

    @seq_task(1)
    def get_task1(self):
        response = self.client.get(url='https://****',
                                   name='https://****',
                                   timeout=30,
                                   allow_redirects=False,
                                   headers={...})   

    @seq_task(2)
    def get_task2(self):
        response = self.client.get(url='https://****',
                                   name='https://****',
                                   timeout=30,
                                   allow_redirects=False,
                                   headers={...})  

有没有办法在第 20(第 n)个用户任务完成后停止测试?例如,如果每个任务访问 4 页,我希望测试在发出 20*4 = 80 页请求后终止。事实上,作为此测试的一部分,总共只应发出 80 个页面请求。

我对这个测试的经验是,页面请求在最后一个用户任务完成后继续进行,直到我手动停止测试或使用比任务实际需要完成的时间长一点的时间限制。

这实际上是可能的!虽然没有记录,但至少在我能找到的任何地方,可以通过调用来阻止蝗虫 提高 StopLocust()

在提供的场景中,您的每个任务都是一个 TaskSequence class。为了在执行单个任务后退出 Locust,您需要向整体 class 添加另一个步骤(函数),它会调用 StopLocust()。我在下面提供了一个示例 -

class Scenario(TaskSequence):

    @seq_task(1)
    def task_1(self):
        self.client.get("/going_somewhere")

    @seq_task(2)
    def task_2(self):
        self.client.get("/going_somewhere_else")

    @seq_task(3)
    def done(self):
        raise StopLocust()

默认情况下,Locust 被设置为永不结束,并且会不断地从提供给它的可用任务集中选择一个任务。提供 raise StopLocust() 将告诉当前用户 (Locust) 在当前任务结束时停止并且不再选择任务。

我用的是locust 1.3+ StopLocust() 已弃用

我的框架为每个用户创建了一个凭证元组列表和支持变量。我已将所有用户凭据、令牌、支持文件名等作为列表的一部分存储在这些元组中。 (实际上这是在启动 locust 之前自动完成的)

我将该列表导入 locustfile

# creds is created before running locust file and can be stored outside or part of locust # file
creds = [('demo_user1', 'pass1', 'lnla'),
         ('demo_user2', 'pass2', 'taam9'),
         ('demo_user3', 'pass3', 'wevee'),
         ('demo_user4', 'pass4', 'avwew')]

class RegisteredUser(SequentialTaskSet)

    def on_start(self):
        self.credentials = creds.pop()

    @task
    def task_one_name(self):
        task_one_commands

    @task
    def task_two_name(self):
        task_two_commands

    @task
    def stop(self):
        if len(creds) == 0:
            self.user.environment.reached_end = True
            self.user.environment.runner.quit()


class ApiUser(HttpUser):
    tasks = [RegisteredUser]
    host = 'hosturl'

我在任务中使用 self.credentials。 我在 class.

中创建了 stop 函数

此外,请注意 RegisteredUser 是从 SequentialTask​​Set 继承到 运行 所有任务的顺序。

使用

加注退出()

如果你想做出明确的命中,创建一个 sequirtial 任务集并 raise exit() raise exit 将停止该用户

#这是一种解决方法,因为没有其他方法对我有用。

headless模式下,如果想停止整个locust服务,可以使用os.kill

class UserTasks(TaskSet):
    @task
    def test_api(self):
        gobal FINISHED_USER, FINISHED_REQUEST
        with self.user.client.post(f"/{api_type}", json=<my data>, headers=<my headers>, catch_response=True) as response:
            # do something
            FINISHED_REQUEST += 1
        if FINISHED_REQUEST == schedule_num:
            # this user finish all its task, will go to on_stop
            self.user._state = "stopping"
            FINISHED_USER += 1
    
    def on_stop(self):
        global FINISHED_USER
        # do something
        # make sure its the last user who finishes its task
        if FINISHED_USER == <your user num>:
            os.kill(<your pid>, signal.SIGTERM)

class WebsiteUser(HttpUser):
    """
    User class that does requests to the locust web server running on localhost
    """
    host = "http://{}".format(<host>)
    wait_time = constant_throughput(1)
    tasks = [UserTasks]

您可以通过引发 locust.exception.StopUser 异常来停止单个用户:

from locust.exception import StopUser

class Scenario(TaskSequence):
    @seq_task(1)
    def task_1(self):
        self.client.get("/going_somewhere")

    @seq_task(2)
    def task_2(self):
        self.client.get("/going_somewhere_else")

    @seq_task(3)
    def done(self):
        raise StopUser()

P.S。 StopLocust 其他答案中提到的异常已弃用,取而代之的是 StopUser 异常。