如何使用 SparkR 读写 ElasticSearch?

How to read and write to ElasticSearch with SparkR?

初学者 SparkR 和 ElasticSearch 问题在这里!

如何将 sparkR 数据帧或 RDD 写入具有多个节点的 ElasticSearch?

存在 specific R package for elastic 但没有提及 hadoop 或分布式数据帧。当我尝试使用它时,出现以下错误:

install.packages("elastic", repos = "http://cran.us.r-project.org")
library(elastic)
df <- read.json('/hadoop/file/location')
connect(es_port = 9200, es_host = 'https://hostname.dev.company.com', es_user = 'username', es_pwd = 'password')
docs_bulk(df)

Error: no 'docs_bulk' method for class SparkDataFrame

如果这是 pyspark,我会使用 , but I can't find any information about it in sparkR from googling. ElasticSearch also has good documentation, but only for Scala and Java

我敢肯定我遗漏了一些明显的东西;任何指导表示赞赏!

要将您的 SparkR 会话连接到 Elasticsearch,您需要使连接器 jar 和您的 ES 配置可用于您的 SparkR 会话。

1: 指定jar(在elasticsearch文档中查找你需要的版本;以下版本适用于spark 2.x、scala 2.11和ES 6.8.0)

sparkPackages <- "org.elasticsearch:elasticsearch-spark-20_2.11:6.8.0"

2:在您的 SparkConfig 中指定您的集群配置。您也可以在此处添加其他 Elasticsearch 配置(当然还有其他 spark 配置)

sparkConfig <- list(es.nodes = "your_comma-separated_es_nodes",
                    es.port = "9200")
  1. 启动 sparkR 会话
sparkR.session(master="your_spark_master", 
               sparkPackages=sparkPackages, 
               sparkConfig=sparkConfig)

  1. 做一些魔术,生成一个你想保存到 ES 的 sparkDataframe

  2. 将您的数据帧写入 ES:

write.df(yourSparkDF, source="org.elasticsearch.spark.sql",
                 path= "your_ES_index_path"
         )