如何从 Spark Structured Streaming 更新 ElasticSearch 中的计数器?
How to Update Counter in ElasticSearch from Spark Structured Streaming?
我正在开发一个 Spark Structured Streaming 项目,目标是 upsert 用户 activity 登录到 ElasticSearch。
问题:
- 当
user_id
在过去8小时内第一次出现时,在ElasticSearch中新建一个条目,并将文档中的counter
设置为1;
- 如果同一用户在过去 8 小时内有更多活动(日志),则更新
counter
字段,将活动数添加到其值,最后更新 update_time
字段。
设置"es.mapping.id" -> "user_id"
和"es.write.operation" -> "upsert"
是我能做到的,但是更新时无法更新计数器和时间。也许 es.update.script.inline
会有帮助?
阅读 ES Scripted Updates document 后,这里有一个使用无痛内联脚本 更新 counter
的简单解决方案。
所以,关键是使用无痛脚本 ctx._source.counter += params.counter
,其中 counter
代表我的 DataFrame 列 'counter
,应该更早聚合。
毕竟我是这样的:
val esOptions = Map(
"es.write.operation" -> "upsert"
,"es.mapping.id" -> "user_id"
,"es.update.script.lang" -> "painless"
,"es.update.script.inline" -> "ctx._source.counter += params.counter"
,"es.update.script.params" -> "counter:counter"
df.writeStream.options(esOptions)
.format("org.elasticsearch.spark.sql")
.start("user_activity/log")
同样,这只能解决计数器更新问题。稍后我会附加更新 update_time
字段的方式。
我正在开发一个 Spark Structured Streaming 项目,目标是 upsert 用户 activity 登录到 ElasticSearch。
问题:
- 当
user_id
在过去8小时内第一次出现时,在ElasticSearch中新建一个条目,并将文档中的counter
设置为1; - 如果同一用户在过去 8 小时内有更多活动(日志),则更新
counter
字段,将活动数添加到其值,最后更新update_time
字段。
设置"es.mapping.id" -> "user_id"
和"es.write.operation" -> "upsert"
是我能做到的,但是更新时无法更新计数器和时间。也许 es.update.script.inline
会有帮助?
阅读 ES Scripted Updates document 后,这里有一个使用无痛内联脚本 更新 counter
的简单解决方案。
所以,关键是使用无痛脚本 ctx._source.counter += params.counter
,其中 counter
代表我的 DataFrame 列 'counter
,应该更早聚合。
毕竟我是这样的:
val esOptions = Map(
"es.write.operation" -> "upsert"
,"es.mapping.id" -> "user_id"
,"es.update.script.lang" -> "painless"
,"es.update.script.inline" -> "ctx._source.counter += params.counter"
,"es.update.script.params" -> "counter:counter"
df.writeStream.options(esOptions)
.format("org.elasticsearch.spark.sql")
.start("user_activity/log")
同样,这只能解决计数器更新问题。稍后我会附加更新 update_time
字段的方式。