如何插入到spark中的elasticsearch?
How to upsert into elasticsearch in spark?
使用 HTTP POST,以下脚本可以插入新字段 createtime
或更新 lastupdatetime
:
curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
"lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
"createtime": "2015-09-16T18:00:00"
"lastupdatetime": "2015-09-16T18:00",
}
}'
但是在spark脚本中,设置"es.write.operation": "upsert"
后,我完全不知道如何插入createtime
。 official document里面只有es.update.script.*
...那么,谁能举个例子吗?
UPDATE:就我而言,我想将 android 设备的信息从登录保存到 one elasticsearch 类型, 并将其首次出现时间设置为 createtime
。如果设备再次出现,我只更新 lastupdatetime
,但保持 createtime
不变。
所以文档id
是android ID,如果id存在,更新lastupdatetime
,否则插入createtime
和lastupdatetime
。所以设置这是(在 python 中):
conf = {
"es.resource.write": "stats-device/activation",
"es.nodes": "NODE1:9200",
"es.write.operation": "upsert",
"es.mapping.id": "id"
# ???
}
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf
)
我只是不知道如何在 id
不存在的情况下插入 new 字段。
没有看到你的Spark脚本,很难给出详细的答案。但通常你会想要使用 elasticsearch-hadoop(因此你需要将该依赖项添加到你的 Build.sbt 文件中,例如)然后在你的脚本中你可以:
import org.elasticsearch.spark._
val documents = sc.parallelize(Seq(Map(
"id" -> 1,
"createtime" -> "2015-09-16T18:00:00"
"lastupdatetime" -> "2015-09-16T18:00"),
Map(<next document>), ...)
.saveToEs("test/type1", Map("es.mapping.id" -> "id"))
根据 official docs。 saveToES 的第二个参数指定地图的 RDD 中的哪个键用作 ElasticSearch 文档 ID。
当然,如果您使用 Spark 执行此操作,则意味着您的行数多于您想要手动输入的行数,因此对于您的情况,您需要将数据转换为 RDD键 -> 脚本中的值的映射。但由于不知道数据来源,我无法深入了解更多细节。
最后,我得到了一个不完美的解决方案:
- 将
createtime
添加到所有源文档;
- 用
create
方法保存到es并忽略已经创建的错误;
- 删除
createtime
字段;
- 用
update
方法再次保存到es;
目前(2015-09-27),步骤2可以通过this patch实现。
使用 HTTP POST,以下脚本可以插入新字段 createtime
或更新 lastupdatetime
:
curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
"lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
"createtime": "2015-09-16T18:00:00"
"lastupdatetime": "2015-09-16T18:00",
}
}'
但是在spark脚本中,设置"es.write.operation": "upsert"
后,我完全不知道如何插入createtime
。 official document里面只有es.update.script.*
...那么,谁能举个例子吗?
UPDATE:就我而言,我想将 android 设备的信息从登录保存到 one elasticsearch 类型, 并将其首次出现时间设置为 createtime
。如果设备再次出现,我只更新 lastupdatetime
,但保持 createtime
不变。
所以文档id
是android ID,如果id存在,更新lastupdatetime
,否则插入createtime
和lastupdatetime
。所以设置这是(在 python 中):
conf = {
"es.resource.write": "stats-device/activation",
"es.nodes": "NODE1:9200",
"es.write.operation": "upsert",
"es.mapping.id": "id"
# ???
}
rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=conf
)
我只是不知道如何在 id
不存在的情况下插入 new 字段。
没有看到你的Spark脚本,很难给出详细的答案。但通常你会想要使用 elasticsearch-hadoop(因此你需要将该依赖项添加到你的 Build.sbt 文件中,例如)然后在你的脚本中你可以:
import org.elasticsearch.spark._
val documents = sc.parallelize(Seq(Map(
"id" -> 1,
"createtime" -> "2015-09-16T18:00:00"
"lastupdatetime" -> "2015-09-16T18:00"),
Map(<next document>), ...)
.saveToEs("test/type1", Map("es.mapping.id" -> "id"))
根据 official docs。 saveToES 的第二个参数指定地图的 RDD 中的哪个键用作 ElasticSearch 文档 ID。
当然,如果您使用 Spark 执行此操作,则意味着您的行数多于您想要手动输入的行数,因此对于您的情况,您需要将数据转换为 RDD键 -> 脚本中的值的映射。但由于不知道数据来源,我无法深入了解更多细节。
最后,我得到了一个不完美的解决方案:
- 将
createtime
添加到所有源文档; - 用
create
方法保存到es并忽略已经创建的错误; - 删除
createtime
字段; - 用
update
方法再次保存到es;
目前(2015-09-27),步骤2可以通过this patch实现。