如何插入到spark中的elasticsearch?

How to upsert into elasticsearch in spark?

使用 HTTP POST,以下脚本可以插入新字段 createtime 或更新 lastupdatetime:

curl -XPOST 'localhost:9200/test/type1/1/_update' -d '{
"doc": {
    "lastupdatetime": "2015-09-16T18:00:00"
}
"upsert" : {
    "createtime": "2015-09-16T18:00:00"
    "lastupdatetime": "2015-09-16T18:00",
}
}'

但是在spark脚本中,设置"es.write.operation": "upsert"后,我完全不知道如何插入createtimeofficial document里面只有es.update.script.*...那么,谁能举个例子吗?

UPDATE:就我而言,我想将 android 设备的信息从登录保存到 one elasticsearch 类型, 并将其首次出现时间设置为 createtime。如果设备再次出现,我只更新 lastupdatetime,但保持 createtime 不变。

所以文档id是android ID,如果id存在,更新lastupdatetime,否则插入createtimelastupdatetime。所以设置这是(在 python 中):

conf = {
    "es.resource.write": "stats-device/activation",
    "es.nodes": "NODE1:9200",
    "es.write.operation": "upsert",
    "es.mapping.id": "id"
    # ???
}

rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=conf
)

我只是不知道如何在 id 不存在的情况下插入 new 字段。

没有看到你的Spark脚本,很难给出详细的答案。但通常你会想要使用 elasticsearch-hadoop(因此你需要将该依赖项添加到你的 Build.sbt 文件中,例如)然后在你的脚本中你可以:

import org.elasticsearch.spark._ 
val documents = sc.parallelize(Seq(Map(
                                   "id" -> 1, 
                                   "createtime" -> "2015-09-16T18:00:00"
                                   "lastupdatetime" -> "2015-09-16T18:00"),
                                  Map(<next document>), ...)
                   .saveToEs("test/type1", Map("es.mapping.id" -> "id"))

根据 official docs。 saveToES 的第二个参数指定地图的 RDD 中的哪个键用作 ElasticSearch 文档 ID。

当然,如果您使用 Spark 执行此操作,则意味着您的行数多于您想要手动输入的行数,因此对于您的情况,您需要将数据转换为 RDD键 -> 脚本中的值的映射。但由于不知道数据来源,我无法深入了解更多细节。

最后,我得到了一个不完美的解决方案:

  1. createtime 添加到所有源文档;
  2. create方法保存到es并忽略已经创建的错误;
  3. 删除 createtime 字段;
  4. update方法再次保存到es;

目前(2015-09-27),步骤2可以通过this patch实现。