如何将 Spark Structured Streaming 与 Kafka Direct Stream 一起使用?
How to use Spark Structured Streaming with Kafka Direct Stream?
我遇到了 Structured Streaming with Spark,它有一个从 S3 存储桶连续消费并将处理结果写入 MySQL 数据库的示例。
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
如何与 Spark Kafka Streaming 一起使用?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
有没有办法在不使用 stream.foreachRDD(rdd => {})
的情况下结合这两个示例?
Is there a way to combine these two examples without using
stream.foreachRDD(rdd => {})
?
还没有。 Spark 2.0.0 没有对结构化流的 Kafka 接收器支持。这是 Spark Streaming 的创建者之一 Spark 2.1.0 according to Tathagata Das 中应该出现的功能。
Here is the relevant JIRA issue。
编辑:(29/11/2018)
是的,从 Spark 2.2 版开始是可能的。
stream
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
查看此 了解更多信息。
编辑:(06/12/2016)
结构化流的 Kafka 0.10 集成现在 expiramentaly supported in Spark 2.0.2:
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
我遇到了类似的问题 w.r.t 从 Kafka 源读取并写入 Cassandra 接收器。在这里创建了一个简单的项目 kafka2spark2cassandra,分享它以防对任何人有帮助。
我遇到了 Structured Streaming with Spark,它有一个从 S3 存储桶连续消费并将处理结果写入 MySQL 数据库的示例。
// Read data continuously from an S3 location
val inputDF = spark.readStream.json("s3://logs")
// Do operations using the standard DataFrame API and write to MySQL
inputDF.groupBy($"action", window($"time", "1 hour")).count()
.writeStream.format("jdbc")
.start("jdbc:mysql//...")
如何与 Spark Kafka Streaming 一起使用?
val stream = KafkaUtils.createDirectStream[String, String](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
有没有办法在不使用 stream.foreachRDD(rdd => {})
的情况下结合这两个示例?
Is there a way to combine these two examples without using
stream.foreachRDD(rdd => {})
?
还没有。 Spark 2.0.0 没有对结构化流的 Kafka 接收器支持。这是 Spark Streaming 的创建者之一 Spark 2.1.0 according to Tathagata Das 中应该出现的功能。 Here is the relevant JIRA issue。
编辑:(29/11/2018)
是的,从 Spark 2.2 版开始是可能的。
stream
.writeStream // use `write` for batch, like DataFrame
.format("kafka")
.option("kafka.bootstrap.servers", "brokerhost1:port1,brokerhost2:port2")
.option("topic", "target-topic1")
.start()
查看此
编辑:(06/12/2016)
结构化流的 Kafka 0.10 集成现在 expiramentaly supported in Spark 2.0.2:
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
ds1
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
我遇到了类似的问题 w.r.t 从 Kafka 源读取并写入 Cassandra 接收器。在这里创建了一个简单的项目 kafka2spark2cassandra,分享它以防对任何人有帮助。