结构化流 - Foreach Sink
Structured Streaming - Foreach Sink
我基本上是从 Kafka 源中读取信息,然后将每条消息转储到我的 foreach
处理器(感谢 Jacek 页面提供的简单示例)。
如果这确实有效,我将在此处的 process
方法中实际执行一些业务逻辑,但是,这不起作用。我相信 println
不起作用,因为它在执行程序上的 运行 并且无法将这些日志返回给驱动程序。但是,这个 insert into
临时 table 至少应该可以工作,并告诉我消息实际上被消耗并处理到接收器。
我在这里错过了什么?
真的在寻找另一双眼睛来检查我在这里的努力:
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load()
val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]
val df = stream.selectExpr("cast (value as string) as json")
val writer = new ForeachWriter[Row] {
val scon = new SConConnection
override def open(partitionId: Long, version: Long) = {
true
}
override def process(value: Row) = {
println("++++++++++++++++++++++++++++++++++++" + value.get(0))
scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
}
override def close(errorOrNull: Throwable) = {
scon.closeConnection
}
}
val yy = df.writeStream
.queryName("ForEachQuery")
.foreach(writer)
.outputMode("append")
.start()
yy.awaitTermination()
感谢 Harald 和其他人的评论,我发现了几件事,这使我实现了正常的处理行为 -
- 使用本地模式测试代码,yarn 不是最大的调试帮助
- 由于某些原因,foreach sink 的process 方法不允许调用其他方法。当我将我的业务逻辑直接放在那里时,它起作用了。
希望对大家有帮助。
我基本上是从 Kafka 源中读取信息,然后将每条消息转储到我的 foreach
处理器(感谢 Jacek 页面提供的简单示例)。
如果这确实有效,我将在此处的 process
方法中实际执行一些业务逻辑,但是,这不起作用。我相信 println
不起作用,因为它在执行程序上的 运行 并且无法将这些日志返回给驱动程序。但是,这个 insert into
临时 table 至少应该可以工作,并告诉我消息实际上被消耗并处理到接收器。
我在这里错过了什么?
真的在寻找另一双眼睛来检查我在这里的努力:
val stream = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", Util.getProperty("kafka10.broker"))
.option("subscribe", src_topic)
.load()
val rec = stream.selectExpr("CAST(value AS STRING) as txnJson").as[(String)]
val df = stream.selectExpr("cast (value as string) as json")
val writer = new ForeachWriter[Row] {
val scon = new SConConnection
override def open(partitionId: Long, version: Long) = {
true
}
override def process(value: Row) = {
println("++++++++++++++++++++++++++++++++++++" + value.get(0))
scon.executeUpdate("insert into rs_kafka10(miscCol) values("+value.get(0)+")")
}
override def close(errorOrNull: Throwable) = {
scon.closeConnection
}
}
val yy = df.writeStream
.queryName("ForEachQuery")
.foreach(writer)
.outputMode("append")
.start()
yy.awaitTermination()
感谢 Harald 和其他人的评论,我发现了几件事,这使我实现了正常的处理行为 -
- 使用本地模式测试代码,yarn 不是最大的调试帮助
- 由于某些原因,foreach sink 的process 方法不允许调用其他方法。当我将我的业务逻辑直接放在那里时,它起作用了。
希望对大家有帮助。