在加特林场景中产生多个进一步动作的动作

Action to spawn multiple further actions in Gatling scenario

背景

我目前正在使用 gatling 开发压力测试工具的能力分析集。

其中一部分涉及使用滚动查询加载弹性搜索,然后调用更新 API。

我想达到的目标

第 1 步: 运行 滚动启动器并保存 _scroll_id 以供进一步滚动查询使用

第 2 步: 运行 重复滚动查询,作为每个滚动查询的一部分,对返回的每个命中进行修改并将其索引回 elasticsearch,有效从一个滚动查询操作中生成最多 1000 个操作,并对结果进行采样。

第 1 步很简单。第 2 步没那么多。

我试过的

我目前正在尝试通过解析 JSON 格式的结果的 ResponseTransformer 来实现这一点,对每个结果进行修改并为每个尝试另一个 exec(http(...).post(...) etc) 索引的线程触发一个线程更改回 elasticsearch。

基本上,我想我的处理方式是错误的。索引线程永远不会 运行,更不用说 gatling 采样了。

这是我的滚动查询操作的主体:

  ...

  val pool = Executors.newFixedThreadPool(parallelism)

  val query = exec(http("Scroll Query")
    .get(s"/_search/scroll")
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response
    .transformResponse { case response if response.isReceived =>
      new ResponseWrapper(response) {
        val responseJson = JSON.parseFull(response.body.string)
        // Get the hits and
        val hits = responseJson.get.asInstanceOf[Map[String, Any]]("hits").asInstanceOf[Map[String,Any]]("hits").asInstanceOf[List[Map[String, Any]]]
        for (hit <- hits) {
          val id = hit.get("_id").get.asInstanceOf[String]
          val immutableSource = hit.get("_source").get.asInstanceOf[Map[String, Any]]
          val source = collection.mutable.Map(immutableSource.toSeq: _*) // Make the map mutable
          source("newfield") = "testvalue" // Make a modification
          Thread.sleep(pause) // Pause to simulate topology throughput
          pool.execute(new DocumentIndexer(index, doctype, id, source)) // Create a new thread that executes the index request
        }
      }
    }) // Make some mods and re-index into elasticsearch

  ...

DocumentIndexer 看起来像这样:

class DocumentIndexer(index: String, doctype: String, id: String, source: scala.collection.mutable.Map[String, Any]) extends Runnable {

  ...

  val httpConf = http
    .baseURL(s"http://$host:$port/${index}/${doctype}/${id}")
    .acceptHeader("application/json")
    .doNotTrackHeader("1")
    .disableWarmUp

  override def run() {

    val json = new ObjectMapper().writeValueAsString(source)

    exec(http(s"Index ${id}")
      .post("/_update")
      .body(StringBody(json)).asJSON)

  }

}

问题

  1. 这甚至可以使用加特林机吗?
  2. 我怎样才能达到我想达到的目的?

感谢任何help/suggestions!

可以通过使用 jsonPath 提取 JSON 命中数组并将元素保存到会话中,然后在操作链中使用 foreach 和 exec 来实现此目的- 在循环中执行索引任务,您可以相应地执行索引。

即:
滚动查询

...
  val query = exec(http("Scroll Query")
    .get(s"/_search/scroll")
    .body(ElFileBody("queries/scrollquery.json")).asJSON // Do the scroll query
    .check(jsonPath("$._scroll_id").saveAs("scroll_id")) // Get the scroll ID from the response
    .check(jsonPath("$.hits.hits[*]").ofType[Map[String,Any]].findAll.saveAs("hitsJson")) // Save a List of hit Maps into the session
  )
...

模拟

...
    val scrollQueries = scenario("Enrichment Topologies").exec(ScrollQueryInitiator.query, repeat(numberOfPagesToScrollThrough, "scrollQueryCounter"){
        exec(ScrollQuery.query, pause(10 seconds).foreach("${hitsJson}", "hit"){ exec(HitProcessor.query) })
    })
...

HitProcessor

...
  def getBody(session: Session): String = {
    val hit = session("hit").as[Map[String,Any]]
    val id = hit("_id").asInstanceOf[String]
    val source = mapAsScalaMap(hit("_source").asInstanceOf[java.util.LinkedHashMap[String,Any]])
    source.put("newfield", "testvalue")
    val sourceJson = new ObjectMapper().writeValueAsString(mapAsJavaMap(source))
    val json = s"""{"doc":${sourceJson}}"""
    json
  }

  def getId(session: Session): String = {
    val hit = session("hit").as[Map[String,Any]]
    val id = URLEncoder.encode(hit("_id").asInstanceOf[String], "UTF-8")
    val uri = s"/${index}/${doctype}/${id}/_update"
    uri
  }

  val query = exec(http(s"Index Item")
    .post(session => getId(session))
    .body(StringBody(session => getBody(session))).asJSON)
...

免责声明:此代码仍需优化!而且我实际上还没有学到很多 scala。欢迎提出更好的解决方案

完成这些之后,我现在真正想要实现的是并行化给定数量的索引任务。即:我得到 1000 次点击,我想为每个单独的点击执行一个索引任务,而不是只是迭代它们并一个接一个地执行它们,我想同时执行 10 个。

但是,我认为这是一个单独的问题,真的,所以我会这样提出。