序列化 Elasticsearch 客户端,以便在 Scala 中循环 Spark RDD 时使用

Serializing Elasticsearch clients for use when looping Spark RDD in Scala

需要执行一系列异步作业,每个作业都需要查询 Elasticsearch,然后开始处理查询结果。

希望利用 Apache Spark 的独立集群模式将作业分配到多台机器,我们使用了 Spark Streaming -- foreachRDD, as well as at Spark Streaming connection pool in each JVM and 中解释的 RDD.foreachPartition 代码模式。

尽管该模式似乎应该扩展到 Elasticsearch,但我们尚未在博客领域找到任何具体的示例来执行以下操作:

job_list_RDD.foreachPartition(RDDpartition => {

  val PartitionClient = Connection.conn()

  RDDpartition.foreach(hostmetric => {
  hostmetric._2 match {
    case "cpu" => {
      generateCPUPlots(PartitionClient, guidid, hostmetric._1, hostmetric._3, lab_index)
    }
    case "mem" => {
      generateMemPlots(PartitionClient, guidid, hostmetric._1, lab_index)
    }
    case _ => {
      logger.debug("Unexpected metric")
    }
  }
  })

  PartitionClient.close()   // SHOULD THIS LINE BE REMOVED ?
})

当连接是 Elasticsearch PreBuiltTransportClient 时:

object Connection {
  def conn() = {
    val port = 9300
    val NodeName = "node4"

    val settings = (new MySettings).settings
    val client = new PreBuiltTransportClient(settings)
     .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(NodeName), port))
    client
  }
}

class MySettings extends Serializable {
  val port = 9300
  val NodeName = "node4"
  val ESClusterName = "sparcplugs"
  val nodes = List(NodeName)
  val addresses = nodes.map { host => new InetSocketTransportAddress(InetAddress.getByName(host), port) }
  val DefaultTimeout = "30s"

  @transient lazy val settingBuilder = Settings.builder()
  val settings = settingBuilder
    .put("cluster.name", ESClusterName)
    .put("client.transport.ping_timeout", DefaultTimeout) //The time to wait for a ping response from a node. Defaults to 5s
   .build()
}

PreBuiltTransportClient 和 Settings.builder() 是 java API 的一部分,与 org.elasticsearch:elasticsearch-5.5.1.jar, org.elasticsearch.client:transport-5.5.1.jar 和 org.elasticsearch.plugin:transport-netty4-client-5.5.1.jar.

请注意,需要 MySettings class 来围绕 org.elasticsearch.common.settings.Settings.Builder 对象(Settings.builder() 的值)创建一个可序列化的包装器,这导致在 job_list_RDD.foreachPartition 循环中以其他方式构建 PreBuiltTransportClient 时出现对象不可序列化异常。

因为这段代码执行时没有序列化异常,并且将执行一些但不是全部的作业,即。它没有完全迭代 job_list_RDD.

在迭代完成之前抛出这两个异常(按此顺序):

[ERROR] 2018-03-21 16:16:58,907 org.apache.spark.util.SparkUncaughtExceptionHandler logError - Uncaught exception in thread Thread[elasticsearch[_client_][generic][T#3],5,main] java.lang.NoSuchMethodError: io.netty.bootstrap.Bootstrap.config()Lio/netty/bootstrap/BootstrapConfig;

[ERROR] 2018-03-21 16:25:50,505 com.thales.sparc.logger.DefaultLog$ generatePlots - org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 38, 192.168.200.105, executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

第二个异常"Remote RPC client disassociated. Likely due to containers exceeding thresholds"中的消息似乎表明 Spark Worker 资源正在超限,这表明 Elasticsearch 客户端连接管理不善,但这些消息是我们在日志中可以找到的唯一反馈。

奇怪的是,如果行

PartitionClient.close()

(在第一个代码段)被注释掉,程序无一例外地完成,但是作业仍然没有全部完成(job_list_RDD 仍然没有完全迭代)。关闭为 RDD 分区打开的连接是 Spark Streaming -- foreachRDD.

中解释的代码模式的一部分

同样的 RDD.foreachParition 代码模式在应用于 MySQL 会话时对我来说效果很好。有没有其他人用 Elasticsearch 试过这个?

连接池在这里有帮助吗?如果是这样,您是否有任何特定的示例 java 或 Elasticsearch 连接池的 Scala 代码?

提前致谢--

事实证明,Elasticsearch 传输客户端 (org.elasticsearch.plugin:transport-netty4-client-5.5.1.jar) 对 Netty 包的使用与 Netty 的使用存在冲突阿帕奇星火。这是 Elasticsearch 开发人员 James Baiera (Serializing Elasticsearch clients) 所说的。

解决方案是使用 Elasticsearch REST 客户端而不是传输客户端 (Java REST Client)。这需要更新我们的 Elasticsearch 安装,因为直到 5.6 版才支持完整的 Java REST API。

那时不需要 foreachPartition 模式,也不需要显式地使 REST 客户端可序列化("extends Serializable")。工作代码模式是:

class RestManager {
  val client: RestHighLevelClient = new RestHighLevelClient(
    RestClient.builder(new HttpHost("micah", 9200, "http")))
}

object myRestClient extends RestManager

job_list_RDD.foreach(job => {
  ..
  myRestClient.client.search(new SearchRequest(ES_index)
    .source(new SearchSourceBuilder.query( .. )))
  ..
})