elasticsearch-hadoop spark 连接器无法 connect/write 使用开箱即用的 ES 服务器设置和默认库设置

elasticsearch-hadoop spark connector unable to connect/write using out-of-box ES server setup, & default library settings

我在使用此处描述的适用于 Spark 的 Elasticsearch 连接器时遇到了一些问题:https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html。我什至无法在他们的页面上找到使用 Elasticsearch 7.4.0 的普通实例的示例 通过

下载并启动
<downloadDir>/bin/elasticsearch 

这是我对 运行 所做的。 我通过以下命令启动了 Spark:

spark-shell --packages "org.elasticsearch:elasticsearch-hadoop:7.4.0"

然后我输入了上面引用的文档页面上给出的代码行:

import org.apache.spark.SparkContext        // attempt 1
import org.apache.spark.SparkContext._
import org.elasticsearch.spark._

val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")

spark.sparkContext.makeRDD( Seq(numbers, airports)).saveToEs("spark/docs")

我收到一些奇怪的错误,表明 ES 正在连接到默认主节点以外的其他节点 [127.0.0.1:9200],然后即使是那个节点也失败了:

[Stage 0:>                                                        (0 + 12) / 12]20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]
20/10/13 19:39:21 ERROR NetworkClient: Node [172.20.0.3:9200] failed (org.apache.commons.httpclient.ConnectTimeoutException: The host did not accept the connection within timeout of 60000 ms); selected next node [127.0.0.1:9200]

请注意,如果我在浏览器 URL 栏中键入 http://127.0.0.1:9200/,我会返回一个 JSON 文档,指示集群已在 localhost:9200 上运行。 所以,我很难过!非常感谢任何指导。

** 更新**

我尝试了 Mikalai 建议的答案(必须通过 RDD 调用 saveToES,而不是 Dataframe,因为由于某种原因它没有编译)。不幸的是,得到了同样的结果。

import org.apache.spark.rdd.RDD   // attempt 2
import org.apache.spark.sql.SparkSession
import org.elasticsearch.spark._

object classes {
  case class AlbumIndex(group: String, year: Int,  title: String)

}
object App extends App {
  import classes._
  val spark = SparkSession .builder() .appName("writetoes") .master("local[*]") .config("spark.es.nodes","localhost").config("spark.es.port","9200").getOrCreate()
  val indexDocuments: Seq[AlbumIndex] = Seq(
      AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
      AlbumIndex("Boston",1976,"Boston"),
      AlbumIndex("Fleetwood Mac", 1979,"Tusk")
  )
  val rdd: RDD[AlbumIndex] = spark.sparkContext.makeRDD( indexDocuments)
  rdd.saveToEs("demoindex/albumindex")
}

您需要配置 elasticsearch 端口和 ip 运行 请在下面找到我认为这会对您有所帮助。

val spark = SparkSession
    .builder()
    .appName("writetoes")
    .master("local[*]")
    .config("spark.es.nodes","localhost")//give your elastic node ip
    .config("spark.es.port","9200")//port where its running
    .getOrCreate()

import spark.implicits._

val indexDocuments = Seq(
    AlbumIndex("Led Zeppelin",1969,"Led Zeppelin"),
    AlbumIndex("Boston",1976,"Boston"),
    AlbumIndex("Fleetwood Mac", 1979,"Tusk")
).toDF

indexDocuments.saveToEs("demoindex/albumindex")

请注意,172.0.0.0 网络 space 是 IP 的私有网络范围。您的 Elasticsearch 节点很可能选择其中一个地址作为绑定地址,而不是 127.0.0.1。 ES-Hadoop/Spark 尝试在写入任何内容之前“发现”您的集群。该发现过程的一部分涉及从给定的节点列表中随机联系一个节点,并向其询问集群中所有节点的 IP 地址。您的 Elasticsearch 节点可能认为它应该可以在 172.x.x.x 上访问并且连接器正在选择该地址作为发现过程的一部分并尝试将其用于所有未来的通信,即使无法建立 IP 连接到该地址(出于多种原因)。

您应该能够为这些类型的本地运行禁用节点发现。这会将 ES-Hadoop/Spark 连接器切换为不尝试查找群集中尚未在 es.nodes 设置中指定的任何节点。您可以通过将 es.nodes.discovery 属性 设置为 false 来执行此操作。在 Spark 中,你需要在它前面加上 spark. 否则 Spark 会抛出 属性。

SparkSession.builder()
      .appName("my-app")
      .config("spark.es.nodes", "localhost")
      .config("spark.es.port", "9200")
      .config("spark.es.nodes.discovery", false)
      .getOrCreate()

所以,问题是我在另一个 window 监听同一个端口的另一个 elasticsearch 实例。这总是以奇怪的方式处理事情。所以.. 这个适配器完全没有问题。问题是我。