如何为 Spark Structured Streaming 编写 ElasticsearchSink

How to write ElasticsearchSink for Spark Structured streaming

我正在使用 Spark 结构化流处理来自 Kafka 队列的大量数据并进行一些繁重的 ML 计算,但我需要将结果写入 Elasticsearch。

我尝试使用 ForeachWriter 但无法在其中获得 SparkContext,另一个选择可能是在 ForeachWriter 中执行 HTTP Post

现在,我正在考虑编写我自己的 ElasticsearchSink。

是否有任何文档可以为 Spark 结构化流创建接收器?

你可以看看ForeachSink。它展示了如何实现 Sink 并将 DataFrame 转换为 RDD(这非常棘手并且有大量评论)。但是,请注意 Sink API 仍然是私有且不成熟的,将来可能会更改。

如果您使用的是 Spark 2.2+ 和 ES 6.x,那么有一个开箱即用的 ES 接收器:

df
  .writeStream
  .outputMode(OutputMode.Append())
  .format("org.elasticsearch.spark.sql") 
  .option("es.mapping.id", "mappingId")
  .start("index/type") // index/type

如果你像我一样使用 ES 5.x,你需要实现一个 EsSink 和一个 EsSinkProvider:

EsSinkProvider:

class EsSinkProvider extends StreamSinkProvider with DataSourceRegister {

  override def createSink(sqlContext: SQLContext,
                          parameters: Map[String, String],
                          partitionColumns: Seq[String],
                          outputMode: OutputMode): Sink = {

    EsSink(sqlContext, parameters, partitionColumns, outputMode)
  }

  override def shortName(): String = "my-es-sink"
}

EsSink:

case class ElasticSearchSink(sqlContext: SQLContext,
                             options: Map[String, String],
                             partitionColumns: Seq[String],
                             outputMode: OutputMode)
  extends Sink {


  override def addBatch(batchId: Long, df: DataFrame): Unit = synchronized {
    val schema = data.schema
    // this ensures that the same query plan will be used
    val rdd: RDD[String] = df.queryExecution.toRdd.mapPartitions { rows =>
      val converter = CatalystTypeConverters.createToScalaConverter(schema)
      rows.map(converter(_).asInstanceOf[Row]).map(_.getAs[String](0))
    }

    // from org.elasticsearch.spark.rdd library
    EsSpark.saveJsonToEs(rdd, "index/type", Map("es.mapping.id" -> "mappingId"))
  }
}

最后,在写入流时使用此提供程序 class 作为 format:

df
  .writeStream
  .queryName("ES-Writer")
  .outputMode(OutputMode.Append())
  .format("path.to.EsProvider")
  .start()