如何为 Locust 设定目标?

How to set goals for Locust?

我们有一个 Locust load/performance 测试 运行(在 docker-compose 设置中)。 它现在在我们的构建服务器上运行。

理想情况下,如果不满足某些要求,我们希望构建作业失败。

例如需要一定的平均响应时间,或在给定超时内的最小请求数。

expections/requirements 必须与聚合数据进行比较。所以不在个别(python)个测试方法中。

一个选项是解析生成的报告,但我认为 Locust 内置了对我正在考虑的功能的支持。

查看 locust-plugins,特别是自定义命令行选项,如 --check-rps--check-fail-ratio--check-avg-response-time

https://github.com/SvenskaSpel/locust-plugins/blob/master/examples/cmd_line_examples.sh

为了自动验证 KPI,您可以使用 Locust 的事件挂钩及其内部统计信息创建自定义插件。整体插件设计非常简单:

  1. 注册退出事件
  2. 获取所有统计数据并序列化它们
  3. 计算缺失的指标(RPS、百分位数……)
  4. 检查提供的 KPI 定义
  5. 根据实际测量再次验证提供的 KPI

整个 KPI 插件代码如下所示:

import logging
from enum import Enum
from typing import List

import locust.env
from locust.stats import calculate_response_time_percentile


class Metric(Enum):
    EROR_RATE = 'error_rate'
    PERCENTILE_90 = 'percentile_90'
    RPS = 'rps'

    @staticmethod
    def has_value(item):
        return item in [v.value for v in Metric.__members__.values()]


class KpiPluigin:
    def __init__(
            self,
            env: locust.env.Environment,
            kpis: List,
    ):
        self.env = env
        self.kpis = kpis
        self.errors = []
        self._validate_kpis()

        events = self.env.events
        events.quitting.add_listener(self.quitting)  # pyre-ignore

    def quitting(self, environment):
        serialized_stats = self.serialize_stats(self.env.stats)
        updated_stats = self._update_data(serialized_stats)
        self._kpi_check(updated_stats)
        self._interpret_errors()

    def serialize_stats(self, stats):
        return [stats.entries[key].serialize() for key in stats.entries.keys() if
                not (stats.entries[key].num_requests == 0 and stats.entries[key].num_failures == 0)]

    def _update_data(self, stats):
        for stat in stats:
            stat['error_rate'] = self._calculate_fail_rate(stat)
            stat['percentile_90'] = self._calculate_percentile(stat, 0.90)
            stat['rps'] = self._calculate_rps(stat)
        return stats

    def _calculate_rps(self, stat):
        rps = stat['num_reqs_per_sec']
        num_of_measurements = len(rps)
        return sum(rps.values()) / num_of_measurements

    def _calculate_fail_rate(self, stat):
        num_failures = stat['num_failures']
        num_requests = stat["num_requests"]
        return (num_failures / num_requests) * 100

    def _calculate_percentile(self, stat, percentile):
        response_times = stat['response_times']
        num_requests = stat['num_requests']
        return calculate_response_time_percentile(response_times, num_requests, percentile)

    def _kpi_check(self, stats):
        if len(stats) == 0:
            return

        for kpi in self.kpis:
            name = list(kpi.keys())[0]
            stat = next(stat for stat in stats if stat["name"] == name)
            if stat:
                kpi_settings = kpi[list(kpi.keys())[0]]
                for kpi_setting in kpi_settings:
                    self._metrics_check(kpi_setting, stat)

    def _metrics_check(self, kpi_setting, stat):
        (metric, value) = kpi_setting
        name = stat["name"]
        if metric == Metric.EROR_RATE.value:
            error_rate = stat['error_rate']
            error_rate <= value or self._log_error(error_rate, kpi_setting, name)
        if metric == Metric.PERCENTILE_90.value:
            percentile = stat['percentile_90']
            percentile <= value or self._log_error(percentile, kpi_setting, name)
        if metric == Metric.RPS.value:
            rps = stat['rps']
            rps >= value or self._log_error(rps, kpi_setting, name)

    def _log_error(self, stat_value, kpi_settings, name):
        (metric, value) = kpi_settings
        self.errors.append(
            f"{metric} for '{name}' is {stat_value}, but expected it to be better than {value}")  # noqa: E501

    def _interpret_errors(self):
        if len(self.errors) == 0:
            logging.info('All KPIs are good!')
        else:
            for error in self.errors:
                logging.error(f"SLA failed: \n {error}")
            self.env.process_exit_code = 1

    def _validate_kpis(self):
        for kpi in self.kpis:
            kpi_keys = list(kpi.keys())
            if len(kpi_keys) > 1:
                raise Exception("Every dict must contain definition for only one endpoint")
            kpi_settings = kpi[kpi_keys[0]]
            if len(kpi_settings) == 0:
                raise Exception(f"No KPI defined for endpoint {kpi_keys[0]}")
            for kpi_setting in kpi_settings:
                (metric, value) = kpi_setting
                if not isinstance(value, (int, float)):
                    raise Exception(f"Provide valid value for '{metric}' metric for endpoint {kpi_keys[0]}")
                if not Metric.has_value(metric):
                    raise Exception(f"Metric {metric} not implemented")

现在您必须在 Locust 脚本中注册 KpiPlugin class 并像这样定义 KPI:

events.init.add_listener
def on_locust_init(environment, **_kwargs):
    KPI_SETTINGS = [{'/store/inventory': [('percentile_90', 50), ('rps', 500), ('error_rate', 0)]}]
    KpiPlugin(env=environment, kpis=KPI_SETTINGS)

如果 /store/inventory 端点不满足定义的标准之一,上述脚本将使您的构建失败 — 90 个百分位数低于 50 毫秒,RPS 低于 500,错误率高于 0 %.