如何使用 Spark 中的 elasticsearch-hadoop 将数据从一个 Elasticsearch 集群重新索引到另一个集群
How to reindex data from one Elasticsearch cluster to another with elasticsearch-hadoop in Spark
我有两个独立的 Elasticsearch 集群,我想将第一个集群的数据重新索引到第二个集群,但我发现我只能在 SparkContext 配置中设置一个 Elasticsearch 集群,例如:
var sparkConf : SparkConf = new SparkConf()
.setAppName("EsReIndex")
sparkConf.set("es.nodes", "node1.cluster1:9200")
那么如何在同一应用程序内部的 Spark 中使用 elastic search-hadoop 在两个 Elasticsearch 集群之间移动数据?
您无需为此在SparkConf 中配置节点地址。
当您使用 elasticsearch
格式的 DataFrameWriter 时,您可以将节点地址作为选项传递,如下所示:
val df = sqlContext.read
.format("elasticsearch")
.option("es.nodes", "node1.cluster1:9200")
.load("your_index/your_type")
df.write
.option("es.nodes", "node2.cluster2:9200")
.save("your_new_index/your_new_type")
这应该适用于 spark 1.6.X 和相应的 elasticsearch-hadoop 连接器。
我有两个独立的 Elasticsearch 集群,我想将第一个集群的数据重新索引到第二个集群,但我发现我只能在 SparkContext 配置中设置一个 Elasticsearch 集群,例如:
var sparkConf : SparkConf = new SparkConf()
.setAppName("EsReIndex")
sparkConf.set("es.nodes", "node1.cluster1:9200")
那么如何在同一应用程序内部的 Spark 中使用 elastic search-hadoop 在两个 Elasticsearch 集群之间移动数据?
您无需为此在SparkConf 中配置节点地址。
当您使用 elasticsearch
格式的 DataFrameWriter 时,您可以将节点地址作为选项传递,如下所示:
val df = sqlContext.read
.format("elasticsearch")
.option("es.nodes", "node1.cluster1:9200")
.load("your_index/your_type")
df.write
.option("es.nodes", "node2.cluster2:9200")
.save("your_new_index/your_new_type")
这应该适用于 spark 1.6.X 和相应的 elasticsearch-hadoop 连接器。