如何使用 spark structured streaming 在 elasticsearch sink 中设置动态 doc id
How to set dynamic doc id in elasticsearch sink using spark structured streaming
在 elasticsearch write sink 中,我应该如何添加来自数据集字段的具有动态值的文档 ID。在我的例子中,我需要根据格式化数据集中的特定字段设置文档 ID。遇到 "es.mapping.id" 但我如何从我的数据集中获取值?
发现只需将字段名称指定为 "es.mapping.id"
的值即可实现此目的
StreamingQuery query = finalData.writeStream()
.outputMode(OutputMode.Append())
.format("org.elasticsearch.spark.sql")
.option("es.mapping.id", "input_key")
.option("checkpointLocation","/tmp/spark-checkpoint")
.start("spark_index/doc");
在 elasticsearch write sink 中,我应该如何添加来自数据集字段的具有动态值的文档 ID。在我的例子中,我需要根据格式化数据集中的特定字段设置文档 ID。遇到 "es.mapping.id" 但我如何从我的数据集中获取值?
发现只需将字段名称指定为 "es.mapping.id"
的值即可实现此目的StreamingQuery query = finalData.writeStream()
.outputMode(OutputMode.Append())
.format("org.elasticsearch.spark.sql")
.option("es.mapping.id", "input_key")
.option("checkpointLocation","/tmp/spark-checkpoint")
.start("spark_index/doc");