如何从 Spark Structured Streaming 更新 ElasticSearch 中的计数器?

How to Update Counter in ElasticSearch from Spark Structured Streaming?

我正在开发一个 Spark Structured Streaming 项目,目标是 upsert 用户 activity 登录到 ElasticSearch。

问题

  1. user_id在过去8小时内第一次出现时,在ElasticSearch中新建一个条目,并将文档中的counter设置为1;
  2. 如果同一用户在过去 8 小时内有更多活动(日志),则更新 counter 字段,将活动数添加到其值,最后更新 update_time 字段。

设置"es.mapping.id" -> "user_id""es.write.operation" -> "upsert"是我能做到的,但是更新时无法更新计数器和时间。也许 es.update.script.inline 会有帮助?

阅读 ES Scripted Updates document 后,这里有一个使用无痛内联脚本 更新 counter 的简单解决方案。

所以,关键是使用无痛脚本 ctx._source.counter += params.counter,其中 counter 代表我的 DataFrame 列 'counter,应该更早聚合。

毕竟我是这样的:

val esOptions = Map(
   "es.write.operation"      -> "upsert"
  ,"es.mapping.id"           -> "user_id"
  ,"es.update.script.lang"   -> "painless"
  ,"es.update.script.inline" -> "ctx._source.counter += params.counter"
  ,"es.update.script.params" -> "counter:counter"

df.writeStream.options(esOptions)
  .format("org.elasticsearch.spark.sql")
  .start("user_activity/log")

同样,这只能解决计数器更新问题。稍后我会附加更新 update_time 字段的方式。