如何使用 Spark 中的 elasticsearch-hadoop 将数据从一个 Elasticsearch 集群重新索引到另一个集群

How to reindex data from one Elasticsearch cluster to another with elasticsearch-hadoop in Spark

我有两个独立的 Elasticsearch 集群,我想将第一个集群的数据重新索引到第二个集群,但我发现我只能在 SparkContext 配置中设置一个 Elasticsearch 集群,例如:

var sparkConf : SparkConf = new SparkConf()
                     .setAppName("EsReIndex")
sparkConf.set("es.nodes", "node1.cluster1:9200")

那么如何在同一应用程序内部的 Spark 中使用 elastic search-hadoop 在两个 Elasticsearch 集群之间移动数据?

您无需为此在SparkConf 中配置节点地址。

当您使用 elasticsearch 格式的 DataFrameWriter 时,您可以将节点地址作为选项传递,如下所示:

val df = sqlContext.read
                  .format("elasticsearch")
                  .option("es.nodes", "node1.cluster1:9200")
                  .load("your_index/your_type")

df.write
    .option("es.nodes", "node2.cluster2:9200")
    .save("your_new_index/your_new_type")

这应该适用于 spark 1.6.X 和相应的 elasticsearch-hadoop 连接器。